diff options
author | Jorge Hernandez <jorge.hernandez-herrero@att.com> | 2019-03-04 20:56:21 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-03-04 20:56:21 +0000 |
commit | e7ad7f65ed1c66b6ec2af6d415a386c931bfe20c (patch) | |
tree | d116387dd3add30cb13015a70f56baf5a74f7d7f | |
parent | aee89a9ca97ba75b9cbb7cf5eba545a8e78acc09 (diff) | |
parent | 265f24eb2a14ec15f397501212cb7eb887cc1f26 (diff) |
Merge "Add various listener classes"
9 files changed, 1256 insertions, 0 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java new file mode 100644 index 00000000..ff8cbc5b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/JsonListener.java @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for messages received on a topic, in JSON format, decodes them into a + * {@link StandardCoderObject}, and then offers the objects to the subclass. + */ +public abstract class JsonListener implements TopicListener { + private static final Logger logger = LoggerFactory.getLogger(JsonListener.class); + + /** + * Used to decode the event. + */ + private static final Coder coder = new StandardCoder(); + + /** + * Constructs the object. + */ + public JsonListener() { + super(); + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, String event) { + // decode from JSON into a standard object + StandardCoderObject sco; + try { + sco = coder.decode(event, StandardCoderObject.class); + + } catch (CoderException e) { + logger.warn("unable to decode: {}", event, e); + return; + } + + onTopicEvent(infra, topic, sco); + } + + /** + * Indicates that a standard object was received. + * + * @param infra infrastructure with which the message was received + * @param topic topic on which the message was received + * @param sco the standard object that was received + */ + public abstract void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java new file mode 100644 index 00000000..195a7eec --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcher.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dispatches standard objects to listeners, based on the message type extracted from the + * message. Only one listener may be registered for a given type. + */ +public class MessageTypeDispatcher extends JsonListener { + private static final Logger logger = LoggerFactory.getLogger(MessageTypeDispatcher.class); + + /** + * Name of the message field, which may be hierarchical. + */ + private final String[] messageFieldNames; + + /** + * Name of the message field, joined with "." - for logging. + */ + private final String fullMessageFieldName; + + /** + * Maps a message type to its listener. + */ + private final ConcurrentHashMap<String, ScoListener<?>> type2listener = new ConcurrentHashMap<>(); + + /** + * Constructs the object. + * + * @param messageFieldNames name of the message field, which may be hierarchical + */ + public MessageTypeDispatcher(String... messageFieldNames) { + this.messageFieldNames = messageFieldNames; + this.fullMessageFieldName = String.join(".", messageFieldNames); + } + + /** + * Registers a listener for a certain type of message. + * + * @param type type of message of interest to the listener + * @param listener listener to register + */ + public <T> void register(String type, ScoListener<T> listener) { + type2listener.put(type, listener); + } + + /** + * Unregisters the listener associated with the specified message type. + * + * @param type type of message whose listener is to be unregistered + */ + public void unregister(String type) { + type2listener.remove(type); + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) { + // extract the message type + final String type = sco.getString(messageFieldNames); + if (type == null) { + logger.warn("unable to extract {}: {}", fullMessageFieldName, sco); + return; + } + + // dispatch the message + ScoListener<?> listener = type2listener.get(type); + if (listener == null) { + logger.info("discarding event of type {}", type); + return; + } + + listener.onTopicEvent(infra, topic, sco); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java new file mode 100644 index 00000000..9ba73c9b --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcher.java @@ -0,0 +1,150 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import com.google.common.base.Strings; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Dispatches messages to listeners based on the request id extracted from the message. A + * listener may be registered for a specific request id or for messages that have no + * request id (i.e., autonomous messages). Note: only one listener may be registered for a + * specific request id. + * + * @param <T> type of message/POJO this handles + */ +public class RequestIdDispatcher<T> extends ScoListener<T> { + + private static final Logger logger = LoggerFactory.getLogger(RequestIdDispatcher.class); + + /** + * Name of the request id field, which may be hierarchical. + */ + private final String[] requestIdFieldNames; + + /** + * Listeners for autonomous messages. + */ + private final ConcurrentLinkedQueue<TypedMessageListener<T>> listeners = new ConcurrentLinkedQueue<>(); + + /** + * Listeners for specific request ids. + */ + private final ConcurrentHashMap<String, TypedMessageListener<T>> req2listener = new ConcurrentHashMap<>(); + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param requestIdFieldNames name of the request id field, which may be hierarchical + */ + public RequestIdDispatcher(Class<T> clazz, String... requestIdFieldNames) { + super(clazz); + this.requestIdFieldNames = requestIdFieldNames; + } + + /** + * Registers a listener for autonomous messages. + * + * @param listener listener to be registered + */ + public void register(TypedMessageListener<T> listener) { + listeners.add(listener); + } + + /** + * Registers a listener for a particular request id. + * + * @param reqid request id of interest + * @param listener listener to be registered + */ + public void register(String reqid, TypedMessageListener<T> listener) { + if (Strings.isNullOrEmpty(reqid)) { + throw new IllegalArgumentException("attempt to register a listener with an empty request id"); + } + + req2listener.put(reqid, listener); + } + + /** + * Unregisters a listener for autonomous messages. + * + * @param listener listener to be unregistered + */ + public void unregister(TypedMessageListener<T> listener) { + listeners.remove(listener); + } + + /** + * Unregisters the listener associated with a particular request id. + * + * @param reqid request id whose listener is to be unregistered + */ + public void unregister(String reqid) { + req2listener.remove(reqid); + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + + // extract the request id + String reqid = sco.getString(requestIdFieldNames); + + // dispatch the message + if (Strings.isNullOrEmpty(reqid)) { + // it's an autonomous message - offer it to all autonomous listeners + for (TypedMessageListener<T> listener : listeners) { + offerToListener(infra, topic, message, listener); + } + + } else { + // it's a response to a particular request + offerToListener(infra, topic, message, req2listener.get(reqid)); + } + } + + /** + * Offers a message to a listener. + * + * @param infra infrastructure on which the message was received + * @param topic topic on which the message was received + * @param msg message that was received + * @param listener listener to which the message should be offered, or {@code null} + */ + private void offerToListener(CommInfrastructure infra, String topic, T msg, TypedMessageListener<T> listener) { + + if (listener == null) { + return; + } + + try { + listener.onTopicEvent(infra, topic, msg); + + } catch (RuntimeException e) { + logger.warn("listener {} failed to process message: {}", listener, msg, e); + } + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java new file mode 100644 index 00000000..a3d33965 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/ScoListener.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for receipt of a {@link StandardCoderObject}, translating it into an object of + * the appropriate type, and then passing it to the subclass. + * + * @param <T> type of message/POJO this handles + */ +public abstract class ScoListener<T> { + + private static final Logger logger = LoggerFactory.getLogger(ScoListener.class); + + /** + * Used to translate the standard object to an object of type "T". + */ + private static final Coder coder = new StandardCoder(); + + /** + * Class of message this handles. + */ + private final Class<T> clazz; + + /** + * Constructs the object. + * + * @param clazz class of message this handles + */ + public ScoListener(Class<T> clazz) { + this.clazz = clazz; + } + + /** + * Receives an event, translates it into the desired type of object, and passes it to + * the subclass. + * + * @param infra infrastructure with which the message was received + * @param topic topic on which the message was received + * @param sco event that was received + */ + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) { + // translate the event to the desired object type + final T msg; + try { + msg = coder.fromStandard(sco, clazz); + + } catch (CoderException e) { + logger.warn("unable to decode {}: {}", clazz.getName(), sco, e); + return; + } + + onTopicEvent(infra, topic, sco, msg); + } + + /** + * Indicates that a message was received. + * + * @param infra infrastructure with which the message was received + * @param topic topic on which the message was received + * @param sco event that was received + * @param message message that was received + */ + public abstract void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message); +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java new file mode 100644 index 00000000..ab381205 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/listeners/TypedMessageListener.java @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; + +/** + * Listener for messages of a certain type. + * + * @param <T> type of message/POJO this handles + */ +public interface TypedMessageListener<T> { + + /** + * Handles receipt of a message. + * + * @param infra infrastructure with which the message was received + * @param topic topic on which the message was received + * @param message message that was received + */ + void onTopicEvent(CommInfrastructure infra, String topic, T message); +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java new file mode 100644 index 00000000..39fc9d8f --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/JsonListenerTest.java @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; + +public class JsonListenerTest { + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(JsonListener.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Original logging level for the logger. + */ + private static Level saveLevel; + + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TOPIC = "my-topic"; + private static final String JSON = "{'abc':'def'}".replace('\'', '"'); + + private JsonListener primary; + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveLevel = logger.getLevel(); + logger.setLevel(Level.INFO); + + appender.setContext(logger.getLoggerContext()); + appender.start(); + } + + @AfterClass + public static void tearDownAfterClass() { + logger.setLevel(saveLevel); + appender.stop(); + } + + /** + * Initializes mocks and a listener. + */ + @Before + public void setUp() { + appender.clearExtractions(); + + primary = new JsonListener() { + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco) {} + }; + } + + @After + public void tearDown() { + logger.detachAppender(appender); + } + + @Test + public void testOnTopicEvent() { + logger.addAppender(appender); + + primary = spy(primary); + + // success + primary.onTopicEvent(INFRA, TOPIC, JSON); + verify(primary).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + + // repeat + primary.onTopicEvent(INFRA, TOPIC, JSON); + verify(primary, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + + assertFalse(appender.getExtracted().toString().contains("unable to decode")); + + // invalid json - decode fails + appender.clearExtractions(); + primary.onTopicEvent(INFRA, TOPIC, "["); + assertTrue(appender.getExtracted().toString().contains("unable to decode")); + verify(primary, times(2)).onTopicEvent(any(), any(), any(StandardCoderObject.class)); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java new file mode 100644 index 00000000..0edcfe12 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/MessageTypeDispatcherTest.java @@ -0,0 +1,180 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; + +public class MessageTypeDispatcherTest { + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(MessageTypeDispatcher.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Original logging level for the logger. + */ + private static Level saveLevel; + + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TYPE_FIELD = "msg-type"; + private static final String TOPIC = "my-topic"; + private static final String TYPE1 = "msg-type-1"; + private static final String TYPE2 = "msg-type-2"; + + private MessageTypeDispatcher primary; + + private ScoListener<String> secondary1; + private ScoListener<String> secondary2; + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveLevel = logger.getLevel(); + logger.setLevel(Level.INFO); + + appender.setContext(logger.getLoggerContext()); + appender.start(); + } + + @AfterClass + public static void tearDownAfterClass() { + logger.setLevel(saveLevel); + appender.stop(); + } + + /** + * Initializes mocks and a listener. + */ + @Before + @SuppressWarnings("unchecked") + public void setUp() { + appender.clearExtractions(); + + secondary1 = mock(ScoListener.class); + secondary2 = mock(ScoListener.class); + + primary = new MessageTypeDispatcher(TYPE_FIELD); + } + + @After + public void tearDown() { + logger.detachAppender(appender); + } + + @Test + public void testRegister_testUnregister() { + primary.register(TYPE1, secondary1); + primary.register(TYPE2, secondary2); + + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + verify(secondary2, never()).onTopicEvent(any(), any(), any()); + + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + verify(secondary2, never()).onTopicEvent(any(), any(), any()); + + primary.unregister(TYPE1); + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + verify(secondary2, never()).onTopicEvent(any(), any(), any()); + + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + + // unregister again + primary.unregister(TYPE1); + + // unregister second type + primary.unregister(TYPE2); + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + verify(secondary2, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + } + + @Test + public void testOnTopicEvent() { + primary.register(TYPE1, secondary1); + + logger.addAppender(appender); + + // success + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + + // repeat + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE1)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), any(StandardCoderObject.class)); + + assertFalse(appender.getExtracted().toString().contains("unable to extract")); + assertFalse(appender.getExtracted().toString().contains("discarding event of type")); + + // no message type + appender.clearExtractions(); + primary.onTopicEvent(INFRA, TOPIC, "{}"); + assertTrue(appender.getExtracted().toString().contains("unable to extract")); + verify(secondary1, times(2)).onTopicEvent(any(), any(), any()); + + // unknown type + appender.clearExtractions(); + primary.onTopicEvent(INFRA, TOPIC, makeMessage(TYPE2)); + assertTrue(appender.getExtracted().toString().contains("discarding event of type")); + verify(secondary1, times(2)).onTopicEvent(any(), any(), any()); + } + + /** + * Makes a JSON message of the given type. + * + * @param msgType the message type + * @return a JSON message of the given type + */ + private String makeMessage(String msgType) { + String json = "{'" + TYPE_FIELD + "':'" + msgType + "', 'abc':'def'}"; + return json.replace('\'', '"'); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java new file mode 100644 index 00000000..6115a57e --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/RequestIdDispatcherTest.java @@ -0,0 +1,309 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; + +public class RequestIdDispatcherTest { + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(RequestIdDispatcher.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Original logging level for the logger. + */ + private static Level saveLevel; + + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String REQID_FIELD = "requestId"; + private static final String TOPIC = "my-topic"; + private static final String REQID1 = "request-1"; + private static final String REQID2 = "request-2"; + + private static final Coder coder = new StandardCoder(); + + private RequestIdDispatcher<MyMessage> primary; + private TypedMessageListener<MyMessage> secondary1; + private TypedMessageListener<MyMessage> secondary2; + private TypedMessageListener<MyMessage> secondary3; + private TypedMessageListener<MyMessage> secondary4; + private MyMessage status; + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveLevel = logger.getLevel(); + logger.setLevel(Level.INFO); + + appender.setContext(logger.getLoggerContext()); + appender.start(); + } + + @AfterClass + public static void tearDownAfterClass() { + logger.setLevel(saveLevel); + appender.stop(); + } + + /** + * Create various mocks and primary listener. + */ + @SuppressWarnings("unchecked") + @Before + public void setUp() { + appender.clearExtractions(); + + secondary1 = mock(TypedMessageListener.class); + secondary2 = mock(TypedMessageListener.class); + secondary3 = mock(TypedMessageListener.class); + secondary4 = mock(TypedMessageListener.class); + + primary = new RequestIdDispatcher<>(MyMessage.class, REQID_FIELD); + } + + @After + public void tearDown() { + logger.detachAppender(appender); + } + + @Test + public void testRegisterMessageListener() { + primary.register(secondary1); + + // should process message that does not have a request id + status = new MyMessage(); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // should process again + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // should NOT process a message that has a request id + status = new MyMessage(REQID1); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + } + + @Test + public void testRegisterStringMessageListener() { + primary.register(REQID1, secondary1); + + // should NOT process message that does not have a request id + status = new MyMessage(); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // should process a message that has the desired request id + status = new MyMessage(REQID1); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // should process again + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // should NOT process a message that does NOT have the desired request id + status = new MyMessage(REQID2); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // null request id => exception + assertThatIllegalArgumentException().isThrownBy(() -> primary.register(null, secondary1)); + + // empty request id => exception + assertThatIllegalArgumentException().isThrownBy(() -> primary.register("", secondary1)); + } + + @Test + public void testUnregisterMessageListener() { + primary.register(secondary1); + primary.register(secondary2); + + // should process message + status = new MyMessage(); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + primary.unregister(secondary1); + + // should NOT process again + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // other listener should still have processed it + verify(secondary2, times(2)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + } + + @Test + public void testUnregisterString() { + primary.register(REQID1, secondary1); + primary.register(REQID2, secondary2); + + // should process a message that has the desired request id + status = new MyMessage(REQID1); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + primary.unregister(REQID1); + + // should NOT re-process + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // secondary should still be able to process + status = new MyMessage(REQID2); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary2).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + } + + @Test + public void testOnTopicEvent() { + primary.register(REQID1, secondary1); + primary.register(REQID2, secondary2); + primary.register(secondary3); + primary.register(secondary4); + + // without request id + status = new MyMessage(); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary2, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary3).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary4).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + + // with request id + status = new MyMessage(REQID1); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + verify(secondary1).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary2, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary3, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + verify(secondary4, never()).onTopicEvent(eq(INFRA), eq(TOPIC), eq(status)); + } + + @Test + public void testOfferToListener() { + logger.addAppender(appender); + + // no listener for this + status = new MyMessage(REQID1); + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + + assertFalse(appender.getExtracted().toString().contains("failed to process message")); + + // listener throws an exception + primary.register(secondary1); + + status = new MyMessage(); + + RuntimeException ex = new RuntimeException("expected exception"); + doThrow(ex).when(secondary1).onTopicEvent(INFRA, TOPIC, status); + + primary.onTopicEvent(INFRA, TOPIC, makeSco(status)); + assertTrue(appender.getExtracted().toString().contains("failed to process message")); + } + + /** + * Makes a standard object from a status message. + * + * @param source message to be converted + * @return a standard object representing the message + */ + private StandardCoderObject makeSco(MyMessage source) { + try { + return coder.toStandard(source); + + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + protected static class MyMessage { + private String requestId; + + public MyMessage() { + super(); + } + + public MyMessage(String requestId) { + this.requestId = requestId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((requestId == null) ? 0 : requestId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MyMessage other = (MyMessage) obj; + if (requestId == null) { + if (other.requestId != null) { + return false; + } + } else if (!requestId.equals(other.requestId)) { + return false; + } + return true; + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java new file mode 100644 index 00000000..0749f891 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/listeners/ScoListenerTest.java @@ -0,0 +1,194 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.listeners; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.common.utils.test.log.logback.ExtractAppender; +import org.slf4j.LoggerFactory; + +public class ScoListenerTest { + + /** + * Used to attach an appender to the class' logger. + */ + private static final Logger logger = (Logger) LoggerFactory.getLogger(ScoListener.class); + private static final ExtractAppender appender = new ExtractAppender(); + + /** + * Original logging level for the logger. + */ + private static Level saveLevel; + + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TOPIC = "my-topic"; + private static final String NAME = "pdp_1"; + + private static final Coder coder = new StandardCoder(); + + private ScoListener<MyMessage> primary; + private MyMessage status; + private StandardCoderObject sco; + + /** + * Initializes statics. + */ + @BeforeClass + public static void setUpBeforeClass() { + saveLevel = logger.getLevel(); + logger.setLevel(Level.INFO); + + appender.setContext(logger.getLoggerContext()); + appender.start(); + } + + @AfterClass + public static void tearDownAfterClass() { + logger.setLevel(saveLevel); + appender.stop(); + } + + /** + * Create various mocks and primary handler. + */ + @Before + public void setUp() { + appender.clearExtractions(); + + primary = new ScoListener<MyMessage>(MyMessage.class) { + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, + MyMessage message) {} + }; + } + + @After + public void tearDown() { + logger.detachAppender(appender); + } + + @Test + public void testOnTopicEvent() { + primary = spy(primary); + + status = new MyMessage(NAME); + sco = makeSco(status); + primary.onTopicEvent(INFRA, TOPIC, sco); + verify(primary).onTopicEvent(eq(INFRA), eq(TOPIC), eq(sco), eq(status)); + + assertFalse(appender.getExtracted().toString().contains("unable to decode")); + + // undecodable message + logger.addAppender(appender); + primary.onTopicEvent(INFRA, TOPIC, makeSco("[]")); + verify(primary, times(1)).onTopicEvent(eq(INFRA), eq(TOPIC), eq(sco), eq(status)); + assertTrue(appender.getExtracted().toString().contains("unable to decode")); + } + + /** + * Makes a standard object from a JSON string. + * + * @param source message to be converted + * @return a standard object representing the message + */ + private StandardCoderObject makeSco(String source) { + try { + return coder.decode(source, StandardCoderObject.class); + + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + /** + * Makes a standard object from a status message. + * + * @param source message to be converted + * @return a standard object representing the message + */ + private StandardCoderObject makeSco(MyMessage source) { + try { + return coder.toStandard(source); + + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + protected static class MyMessage { + private String name; + + public MyMessage() { + super(); + } + + public MyMessage(String name) { + this.name = name; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MyMessage other = (MyMessage) obj; + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + return true; + } + } +} |