diff options
5 files changed, 348 insertions, 1 deletions
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml index 1d8ae53c..53e2186f 100644 --- a/policy-endpoints/pom.xml +++ b/policy-endpoints/pom.xml @@ -19,7 +19,8 @@ ============LICENSE_END========================================================= --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -205,6 +206,12 @@ <artifactId>openpojo</artifactId> </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> <build> diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java new file mode 100644 index 00000000..3188b8d4 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + + +import java.util.List; + +import lombok.Getter; + +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client for sending messages to a Topic using TopicSink. + */ +public class TopicSinkClient { + private static final Logger logger = LoggerFactory.getLogger(TopicSinkClient.class); + + /** + * Coder used to encode messages being sent to the topic. + */ + private static final Coder CODER = new StandardCoder(); + + /** + * Topic to which messages are published. + */ + @Getter + private final String topic; + + /** + * Where messages are published. + */ + private final TopicSink sink; + + /** + * Constructs the object. + * + * @param topic topic to which messages should be published + * @throws TopicSinkClientException if the topic does not exist + */ + public TopicSinkClient(final String topic) throws TopicSinkClientException { + this.topic = topic; + + final List<TopicSink> lst = getTopicSinks(topic); + if (lst.isEmpty()) { + throw new TopicSinkClientException("no sinks for topic: " + topic); + } + + this.sink = lst.get(0); + } + + /** + * Sends a message to the topic, after encoding the message as json. + * + * @param message message to be encoded and sent + * @return {@code true} if the message was successfully sent/enqueued, {@code false} otherwise + */ + public boolean send(final Object message) { + try { + final String json = CODER.encode(message); + return sink.send(json); + + } catch (RuntimeException | CoderException e) { + logger.warn("send to {} failed because of {}", topic, e.getMessage(), e); + return false; + } + } + + // the remaining methods are wrappers that can be overridden by junit tests + + /** + * Gets the sinks for a given topic. + * + * @param topic the topic of interest + * @return the sinks for the topic + */ + protected List<TopicSink> getTopicSinks(final String topic) { + return TopicEndpoint.manager.getTopicSinks(topic); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java new file mode 100644 index 00000000..608393b3 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +/** + * Exception thrown by TopicSink client classes. + */ +public class TopicSinkClientException extends Exception { + private static final long serialVersionUID = 1L; + + public TopicSinkClientException() { + super(); + } + + public TopicSinkClientException(final String message) { + super(message); + } + + public TopicSinkClientException(final Throwable cause) { + super(cause); + } + + public TopicSinkClientException(final String message, final Throwable cause) { + super(message, cause); + } + + public TopicSinkClientException(final String message, final Throwable cause, final boolean enableSuppression, + final boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java new file mode 100644 index 00000000..c0814703 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; + +public class TopicSinkClientExceptionTest { + + @Test + public void test() { + assertEquals(5, new ExceptionsTester().test(TopicSinkClientException.class)); + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java new file mode 100644 index 00000000..725c0418 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java @@ -0,0 +1,150 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.client; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.TopicSink; + +public class TopicSinkClientTest { + private static final String SINK_FIELD_NAME = "sink"; + private static final String TOPIC = "my-topic"; + + private TopicSinkClient client; + private TopicSink sink; + private List<TopicSink> sinks; + + /** + * Creates mocks and an initial client object. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + sink = mock(TopicSink.class); + when(sink.send(anyString())).thenReturn(true); + + sinks = Arrays.asList(sink, null); + + client = new TopicSinkClient2(TOPIC); + } + + @AfterClass + public static void tearDown() throws Exception { + // clear all topics after the tests + TopicEndpoint.manager.shutdown(); + } + + /** + * Uses a real NO-OP topic sink. + */ + @Test + public void testGetTopicSinks() throws Exception { + // clear all topics and then configure one topic + TopicEndpoint.manager.shutdown(); + + final Properties props = new Properties(); + props.setProperty("noop.sink.topics", TOPIC); + TopicEndpoint.manager.addTopicSinks(props); + + sink = TopicEndpoint.manager.getNoopTopicSink(TOPIC); + assertNotNull(sink); + + final AtomicReference<String> evref = new AtomicReference<>(null); + + sink.register(new TopicListener() { + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final String event) { + evref.set(event); + } + }); + + sink.start(); + + client = new TopicSinkClient(TOPIC); + client.send(100); + + assertEquals("100", evref.get()); + } + + @Test + public void testTopicSinkClient_testGetTopic() { + assertEquals(TOPIC, client.getTopic()); + assertSame(sink, Whitebox.getInternalState(client, SINK_FIELD_NAME)); + + // unknown topic -> should throw exception + sinks = new LinkedList<>(); + assertThatThrownBy(() -> new TopicSinkClient2(TOPIC)).isInstanceOf(TopicSinkClientException.class) + .hasMessage("no sinks for topic: my-topic"); + } + + @Test + public void testSend() throws Exception { + client.send(Arrays.asList("abc", "def")); + verify(sink).send("['abc','def']".replace('\'', '"')); + + // sink send fails + when(sink.send(anyString())).thenReturn(false); + assertFalse(client.send("ghi")); + + // sink send throws an exception + final RuntimeException ex = new RuntimeException("expected exception"); + when(sink.send(anyString())).thenThrow(ex); + assertFalse(client.send("jkl")); + } + + /** + * TopicSinkClient with some overrides. + */ + private class TopicSinkClient2 extends TopicSinkClient { + + public TopicSinkClient2(final String topic) throws TopicSinkClientException { + super(topic); + } + + @Override + protected List<TopicSink> getTopicSinks(final String topic) { + return sinks; + } + } +} |