summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJorge Hernandez <jorge.hernandez-herrero@att.com>2019-03-04 20:56:21 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-04 20:56:21 +0000
commite7ad7f65ed1c66b6ec2af6d415a386c931bfe20c (patch)
treed116387dd3add30cb13015a70f56baf5a74f7d7f
parentaee89a9ca97ba75b9cbb7cf5eba545a8e78acc09 (diff)
parent265f24eb2a14ec15f397501212cb7eb887cc1f26 (diff)
Merge "Add various listener classes"
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java74
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java98
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java150
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java91
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java40
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java120
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java180
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java309
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java194
9 files changed, 1256 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java
new file mode 100644
index 00000000..ff8cbc5b
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for messages received on a topic, in JSON format, decodes them into a
+ * {@link StandardCoderObject}, and then offers the objects to the subclass.
+ */
+public abstract class JsonListener implements TopicListener {
+ private static final Logger logger = LoggerFactory.getLogger(JsonListener.class);
+
+ /**
+ * Used to decode the event.
+ */
+ private static final Coder coder = new StandardCoder();
+
+ /**
+ * Constructs the object.
+ */
+ public JsonListener() {
+ super();
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, String event) {
+ // decode from JSON into a standard object
+ StandardCoderObject sco;
+ try {
+ sco = coder.decode(event, StandardCoderObject.class);
+
+ } catch (CoderException e) {
+ logger.warn("unable to decode: {}", event, e);
+ return;
+ }
+
+ onTopicEvent(infra, topic, sco);
+ }
+
+ /**
+ * Indicates that a standard object was received.
+ *
+ * @param infra infrastructure with which the message was received
+ * @param topic topic on which the message was received
+ * @param sco the standard object that was received
+ */
+ public abstract void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco);
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java
new file mode 100644
index 00000000..195a7eec
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatches standard objects to listeners, based on the message type extracted from the
+ * message. Only one listener may be registered for a given type.
+ */
+public class MessageTypeDispatcher extends JsonListener {
+ private static final Logger logger = LoggerFactory.getLogger(MessageTypeDispatcher.class);
+
+ /**
+ * Name of the message field, which may be hierarchical.
+ */
+ private final String[] messageFieldNames;
+
+ /**
+ * Name of the message field, joined with "." - for logging.
+ */
+ private final String fullMessageFieldName;
+
+ /**
+ * Maps a message type to its listener.
+ */
+ private final ConcurrentHashMap<String, ScoListener<?>> type2listener = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param messageFieldNames name of the message field, which may be hierarchical
+ */
+ public MessageTypeDispatcher(String... messageFieldNames) {
+ this.messageFieldNames = messageFieldNames;
+ this.fullMessageFieldName = String.join(".", messageFieldNames);
+ }
+
+ /**
+ * Registers a listener for a certain type of message.
+ *
+ * @param type type of message of interest to the listener
+ * @param listener listener to register
+ */
+ public <T> void register(String type, ScoListener<T> listener) {
+ type2listener.put(type, listener);
+ }
+
+ /**
+ * Unregisters the listener associated with the specified message type.
+ *
+ * @param type type of message whose listener is to be unregistered
+ */
+ public void unregister(String type) {
+ type2listener.remove(type);
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) {
+ // extract the message type
+ final String type = sco.getString(messageFieldNames);
+ if (type == null) {
+ logger.warn("unable to extract {}: {}", fullMessageFieldName, sco);
+ return;
+ }
+
+ // dispatch the message
+ ScoListener<?> listener = type2listener.get(type);
+ if (listener == null) {
+ logger.info("discarding event of type {}", type);
+ return;
+ }
+
+ listener.onTopicEvent(infra, topic, sco);
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java
new file mode 100644
index 00000000..9ba73c9b
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java
@@ -0,0 +1,150 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import com.google.common.base.Strings;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatches messages to listeners based on the request id extracted from the message. A
+ * listener may be registered for a specific request id or for messages that have no
+ * request id (i.e., autonomous messages). Note: only one listener may be registered for a
+ * specific request id.
+ *
+ * @param <T> type of message/POJO this handles
+ */
+public class RequestIdDispatcher<T> extends ScoListener<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(RequestIdDispatcher.class);
+
+ /**
+ * Name of the request id field, which may be hierarchical.
+ */
+ private final String[] requestIdFieldNames;
+
+ /**
+ * Listeners for autonomous messages.
+ */
+ private final ConcurrentLinkedQueue<TypedMessageListener<T>> listeners = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Listeners for specific request ids.
+ */
+ private final ConcurrentHashMap<String, TypedMessageListener<T>> req2listener = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param clazz class of message this handles
+ * @param requestIdFieldNames name of the request id field, which may be hierarchical
+ */
+ public RequestIdDispatcher(Class<T> clazz, String... requestIdFieldNames) {
+ super(clazz);
+ this.requestIdFieldNames = requestIdFieldNames;
+ }
+
+ /**
+ * Registers a listener for autonomous messages.
+ *
+ * @param listener listener to be registered
+ */
+ public void register(TypedMessageListener<T> listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Registers a listener for a particular request id.
+ *
+ * @param reqid request id of interest
+ * @param listener listener to be registered
+ */
+ public void register(String reqid, TypedMessageListener<T> listener) {
+ if (Strings.isNullOrEmpty(reqid)) {
+ throw new IllegalArgumentException("attempt to register a listener with an empty request id");
+ }
+
+ req2listener.put(reqid, listener);
+ }
+
+ /**
+ * Unregisters a listener for autonomous messages.
+ *
+ * @param listener listener to be unregistered
+ */
+ public void unregister(TypedMessageListener<T> listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Unregisters the listener associated with a particular request id.
+ *
+ * @param reqid request id whose listener is to be unregistered
+ */
+ public void unregister(String reqid) {
+ req2listener.remove(reqid);
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
+
+ // extract the request id
+ String reqid = sco.getString(requestIdFieldNames);
+
+ // dispatch the message
+ if (Strings.isNullOrEmpty(reqid)) {
+ // it's an autonomous message - offer it to all autonomous listeners
+ for (TypedMessageListener<T> listener : listeners) {
+ offerToListener(infra, topic, message, listener);
+ }
+
+ } else {
+ // it's a response to a particular request
+ offerToListener(infra, topic, message, req2listener.get(reqid));
+ }
+ }
+
+ /**
+ * Offers a message to a listener.
+ *
+ * @param infra infrastructure on which the message was received
+ * @param topic topic on which the message was received
+ * @param msg message that was received
+ * @param listener listener to which the message should be offered, or {@code null}
+ */
+ private void offerToListener(CommInfrastructure infra, String topic, T msg, TypedMessageListener<T> listener) {
+
+ if (listener == null) {
+ return;
+ }
+
+ try {
+ listener.onTopicEvent(infra, topic, msg);
+
+ } catch (RuntimeException e) {
+ logger.warn("listener {} failed to process message: {}", listener, msg, e);
+ }
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java
new file mode 100644
index 00000000..a3d33965
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for receipt of a {@link StandardCoderObject}, translating it into an object of
+ * the appropriate type, and then passing it to the subclass.
+ *
+ * @param <T> type of message/POJO this handles
+ */
+public abstract class ScoListener<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(ScoListener.class);
+
+ /**
+ * Used to translate the standard object to an object of type "T".
+ */
+ private static final Coder coder = new StandardCoder();
+
+ /**
+ * Class of message this handles.
+ */
+ private final Class<T> clazz;
+
+ /**
+ * Constructs the object.
+ *
+ * @param clazz class of message this handles
+ */
+ public ScoListener(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ /**
+ * Receives an event, translates it into the desired type of object, and passes it to
+ * the subclass.
+ *
+ * @param infra infrastructure with which the message was received
+ * @param topic topic on which the message was received
+ * @param sco event that was received
+ */
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) {
+ // translate the event to the desired object type
+ final T msg;
+ try {
+ msg = coder.fromStandard(sco, clazz);
+
+ } catch (CoderException e) {
+ logger.warn("unable to decode {}: {}", clazz.getName(), sco, e);
+ return;
+ }
+
+ onTopicEvent(infra, topic, sco, msg);
+ }
+
+ /**
+ * Indicates that a message was received.
+ *
+ * @param infra infrastructure with which the message was received
+ * @param topic topic on which the message was received
+ * @param sco event that was received
+ * @param message message that was received
+ */
+ public abstract void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message);
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java
new file mode 100644
index 00000000..ab381205
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+
+/**
+ * Listener for messages of a certain type.
+ *
+ * @param <T> type of message/POJO this handles
+ */
+public interface TypedMessageListener<T> {
+
+ /**
+ * Handles receipt of a message.
+ *
+ * @param infra infrastructure with which the message was received
+ * @param topic topic on which the message was received
+ * @param message message that was received
+ */
+ void onTopicEvent(CommInfrastructure infra, String topic, T message);
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java
new file mode 100644
index 00000000..39fc9d8f
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.slf4j.LoggerFactory;
+
+public class JsonListenerTest {
+
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(JsonListener.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ /**
+ * Original logging level for the logger.
+ */
+ private static Level saveLevel;
+
+ private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+ private static final String TOPIC = "my-topic";
+ private static final String JSON = "{'abc':'def'}".replace('\'', '"');
+
+ private JsonListener primary;
+
+ /**
+ * Initializes statics.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveLevel = logger.getLevel();
+ logger.setLevel(Level.INFO);
+
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ logger.setLevel(saveLevel);
+ appender.stop();
+ }
+
+ /**
+ * Initializes mocks and a listener.
+ */
+ @Before
+ public void setUp() {
+ appender.clearExtractions();
+
+ primary = new JsonListener() {
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) {}
+ };
+ }
+
+ @After
+ public void tearDown() {
+ logger.detachAppender(appender);
+ }
+
+ @Test
+ public void testOnTopicEvent() {
+ logger.addAppender(appender);
+
+ primary = spy(primary);
+
+ // success
+ primary.onTopicEvent(INFRA, TOPIC, JSON);
+ verify(primary).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+
+ // repeat
+ primary.onTopicEvent(INFRA, TOPIC, JSON);
+ verify(primary, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+
+ assertFalse(appender.getExtracted().toString().contains("unable to decode"));
+
+ // invalid json - decode fails
+ appender.clearExtractions();
+ primary.onTopicEvent(INFRA, TOPIC, "[");
+ assertTrue(appender.getExtracted().toString().contains("unable to decode"));
+ verify(primary, times(2)).onTopicEvent(any(), any(), any(StandardCoderObject.class));
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java
new file mode 100644
index 00000000..0edcfe12
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java
@@ -0,0 +1,180 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.slf4j.LoggerFactory;
+
+public class MessageTypeDispatcherTest {
+
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(MessageTypeDispatcher.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ /**
+ * Original logging level for the logger.
+ */
+ private static Level saveLevel;
+
+ private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+ private static final String TYPE_FIELD = "msg-type";
+ private static final String TOPIC = "my-topic";
+ private static final String TYPE1 = "msg-type-1";
+ private static final String TYPE2 = "msg-type-2";
+
+ private MessageTypeDispatcher primary;
+
+ private ScoListener<String> secondary1;
+ private ScoListener<String> secondary2;
+
+ /**
+ * Initializes statics.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveLevel = logger.getLevel();
+ logger.setLevel(Level.INFO);
+
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ logger.setLevel(saveLevel);
+ appender.stop();
+ }
+
+ /**
+ * Initializes mocks and a listener.
+ */
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ appender.clearExtractions();
+
+ secondary1 = mock(ScoListener.class);
+ secondary2 = mock(ScoListener.class);
+
+ primary = new MessageTypeDispatcher(TYPE_FIELD);
+ }
+
+ @After
+ public void tearDown() {
+ logger.detachAppender(appender);
+ }
+
+ @Test
+ public void testRegister_testUnregister() {
+ primary.register(TYPE1, secondary1);
+ primary.register(TYPE2, secondary2);
+
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ verify(secondary2, never()).onTopicEvent(any(), any(), any());
+
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ verify(secondary2, never()).onTopicEvent(any(), any(), any());
+
+ primary.unregister(TYPE1);
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ verify(secondary2, never()).onTopicEvent(any(), any(), any());
+
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+
+ // unregister again
+ primary.unregister(TYPE1);
+
+ // unregister second type
+ primary.unregister(TYPE2);
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ verify(secondary2, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+ }
+
+ @Test
+ public void testOnTopicEvent() {
+ primary.register(TYPE1, secondary1);
+
+ logger.addAppender(appender);
+
+ // success
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+
+ // repeat
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class));
+
+ assertFalse(appender.getExtracted().toString().contains("unable to extract"));
+ assertFalse(appender.getExtracted().toString().contains("discarding event of type"));
+
+ // no message type
+ appender.clearExtractions();
+ primary.onTopicEvent(INFRA, TOPIC, "{}");
+ assertTrue(appender.getExtracted().toString().contains("unable to extract"));
+ verify(secondary1, times(2)).onTopicEvent(any(), any(), any());
+
+ // unknown type
+ appender.clearExtractions();
+ primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2));
+ assertTrue(appender.getExtracted().toString().contains("discarding event of type"));
+ verify(secondary1, times(2)).onTopicEvent(any(), any(), any());
+ }
+
+ /**
+ * Makes a JSON message of the given type.
+ *
+ * @param msgType the message type
+ * @return a JSON message of the given type
+ */
+ private String makeMessage(String msgType) {
+ String json = "{'" + TYPE_FIELD + "':'" + msgType + "', 'abc':'def'}";
+ return json.replace('\'', '"');
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java
new file mode 100644
index 00000000..6115a57e
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java
@@ -0,0 +1,309 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.slf4j.LoggerFactory;
+
+public class RequestIdDispatcherTest {
+
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(RequestIdDispatcher.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ /**
+ * Original logging level for the logger.
+ */
+ private static Level saveLevel;
+
+ private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+ private static final String REQID_FIELD = "requestId";
+ private static final String TOPIC = "my-topic";
+ private static final String REQID1 = "request-1";
+ private static final String REQID2 = "request-2";
+
+ private static final Coder coder = new StandardCoder();
+
+ private RequestIdDispatcher<MyMessage> primary;
+ private TypedMessageListener<MyMessage> secondary1;
+ private TypedMessageListener<MyMessage> secondary2;
+ private TypedMessageListener<MyMessage> secondary3;
+ private TypedMessageListener<MyMessage> secondary4;
+ private MyMessage status;
+
+ /**
+ * Initializes statics.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveLevel = logger.getLevel();
+ logger.setLevel(Level.INFO);
+
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ logger.setLevel(saveLevel);
+ appender.stop();
+ }
+
+ /**
+ * Create various mocks and primary listener.
+ */
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() {
+ appender.clearExtractions();
+
+ secondary1 = mock(TypedMessageListener.class);
+ secondary2 = mock(TypedMessageListener.class);
+ secondary3 = mock(TypedMessageListener.class);
+ secondary4 = mock(TypedMessageListener.class);
+
+ primary = new RequestIdDispatcher<>(MyMessage.class, REQID_FIELD);
+ }
+
+ @After
+ public void tearDown() {
+ logger.detachAppender(appender);
+ }
+
+ @Test
+ public void testRegisterMessageListener() {
+ primary.register(secondary1);
+
+ // should process message that does not have a request id
+ status = new MyMessage();
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // should process again
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // should NOT process a message that has a request id
+ status = new MyMessage(REQID1);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ }
+
+ @Test
+ public void testRegisterStringMessageListener() {
+ primary.register(REQID1, secondary1);
+
+ // should NOT process message that does not have a request id
+ status = new MyMessage();
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // should process a message that has the desired request id
+ status = new MyMessage(REQID1);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // should process again
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // should NOT process a message that does NOT have the desired request id
+ status = new MyMessage(REQID2);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // null request id => exception
+ assertThatIllegalArgumentException().isThrownBy(() -> primary.register(null, secondary1));
+
+ // empty request id => exception
+ assertThatIllegalArgumentException().isThrownBy(() -> primary.register("", secondary1));
+ }
+
+ @Test
+ public void testUnregisterMessageListener() {
+ primary.register(secondary1);
+ primary.register(secondary2);
+
+ // should process message
+ status = new MyMessage();
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ primary.unregister(secondary1);
+
+ // should NOT process again
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // other listener should still have processed it
+ verify(secondary2, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ }
+
+ @Test
+ public void testUnregisterString() {
+ primary.register(REQID1, secondary1);
+ primary.register(REQID2, secondary2);
+
+ // should process a message that has the desired request id
+ status = new MyMessage(REQID1);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ primary.unregister(REQID1);
+
+ // should NOT re-process
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // secondary should still be able to process
+ status = new MyMessage(REQID2);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ }
+
+ @Test
+ public void testOnTopicEvent() {
+ primary.register(REQID1, secondary1);
+ primary.register(REQID2, secondary2);
+ primary.register(secondary3);
+ primary.register(secondary4);
+
+ // without request id
+ status = new MyMessage();
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary2, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary3).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary4).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+
+ // with request id
+ status = new MyMessage(REQID1);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary2, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary3, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ verify(secondary4, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status));
+ }
+
+ @Test
+ public void testOfferToListener() {
+ logger.addAppender(appender);
+
+ // no listener for this
+ status = new MyMessage(REQID1);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+
+ assertFalse(appender.getExtracted().toString().contains("failed to process message"));
+
+ // listener throws an exception
+ primary.register(secondary1);
+
+ status = new MyMessage();
+
+ RuntimeException ex = new RuntimeException("expected exception");
+ doThrow(ex).when(secondary1).onTopicEvent(INFRA, TOPIC, status);
+
+ primary.onTopicEvent(INFRA, TOPIC, makeSco(status));
+ assertTrue(appender.getExtracted().toString().contains("failed to process message"));
+ }
+
+ /**
+ * Makes a standard object from a status message.
+ *
+ * @param source message to be converted
+ * @return a standard object representing the message
+ */
+ private StandardCoderObject makeSco(MyMessage source) {
+ try {
+ return coder.toStandard(source);
+
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static class MyMessage {
+ private String requestId;
+
+ public MyMessage() {
+ super();
+ }
+
+ public MyMessage(String requestId) {
+ this.requestId = requestId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((requestId == null) ? 0 : requestId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MyMessage other = (MyMessage) obj;
+ if (requestId == null) {
+ if (other.requestId != null) {
+ return false;
+ }
+ } else if (!requestId.equals(other.requestId)) {
+ return false;
+ }
+ return true;
+ }
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java
new file mode 100644
index 00000000..0749f891
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java
@@ -0,0 +1,194 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.listeners;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.slf4j.LoggerFactory;
+
+public class ScoListenerTest {
+
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(ScoListener.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ /**
+ * Original logging level for the logger.
+ */
+ private static Level saveLevel;
+
+ private static final CommInfrastructure INFRA = CommInfrastructure.NOOP;
+ private static final String TOPIC = "my-topic";
+ private static final String NAME = "pdp_1";
+
+ private static final Coder coder = new StandardCoder();
+
+ private ScoListener<MyMessage> primary;
+ private MyMessage status;
+ private StandardCoderObject sco;
+
+ /**
+ * Initializes statics.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ saveLevel = logger.getLevel();
+ logger.setLevel(Level.INFO);
+
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ logger.setLevel(saveLevel);
+ appender.stop();
+ }
+
+ /**
+ * Create various mocks and primary handler.
+ */
+ @Before
+ public void setUp() {
+ appender.clearExtractions();
+
+ primary = new ScoListener<MyMessage>(MyMessage.class) {
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco,
+ MyMessage message) {}
+ };
+ }
+
+ @After
+ public void tearDown() {
+ logger.detachAppender(appender);
+ }
+
+ @Test
+ public void testOnTopicEvent() {
+ primary = spy(primary);
+
+ status = new MyMessage(NAME);
+ sco = makeSco(status);
+ primary.onTopicEvent(INFRA, TOPIC, sco);
+ verify(primary).onTopicEvent(eq(INFRA), eq(TOPIC), eq(sco), eq(status));
+
+ assertFalse(appender.getExtracted().toString().contains("unable to decode"));
+
+ // undecodable message
+ logger.addAppender(appender);
+ primary.onTopicEvent(INFRA, TOPIC, makeSco("[]"));
+ verify(primary, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(sco), eq(status));
+ assertTrue(appender.getExtracted().toString().contains("unable to decode"));
+ }
+
+ /**
+ * Makes a standard object from a JSON string.
+ *
+ * @param source message to be converted
+ * @return a standard object representing the message
+ */
+ private StandardCoderObject makeSco(String source) {
+ try {
+ return coder.decode(source, StandardCoderObject.class);
+
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Makes a standard object from a status message.
+ *
+ * @param source message to be converted
+ * @return a standard object representing the message
+ */
+ private StandardCoderObject makeSco(MyMessage source) {
+ try {
+ return coder.toStandard(source);
+
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static class MyMessage {
+ private String name;
+
+ public MyMessage() {
+ super();
+ }
+
+ public MyMessage(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MyMessage other = (MyMessage) obj;
+ if (name == null) {
+ if (other.name != null) {
+ return false;
+ }
+ } else if (!name.equals(other.name)) {
+ return false;
+ }
+ return true;
+ }
+ }
+}