diff options
Diffstat (limited to 'policy-endpoints/src/test/java')
-rw-r--r-- | policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java index cfb65e16..2605c14b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.client; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -28,16 +29,24 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -46,11 +55,18 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; @RunWith(MockitoJUnitRunner.class) public class BidirectionalTopicClientTest { + private static final Coder coder = new StandardCoder(); + private static final long MAX_WAIT_MS = 5000; + private static final long SHORT_WAIT_MS = 1; private static final String SINK_TOPIC = "my-sink-topic"; private static final String SOURCE_TOPIC = "my-source-topic"; + private static final String MY_TEXT = "my-text"; private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB; private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP; @@ -64,7 +80,10 @@ public class BidirectionalTopicClientTest { @Mock private TopicListener listener; + private MyMessage theMessage; + private BidirectionalTopicClient client; + private Context context; /** * Configures the endpoints. @@ -104,7 +123,16 @@ public class BidirectionalTopicClientTest { when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList()); when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source)); + theMessage = new MyMessage(MY_TEXT); + client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC); + + context = new Context(); + } + + @After + public void tearDown() { + context.stop(); } @Test @@ -169,6 +197,151 @@ public class BidirectionalTopicClientTest { assertNotSame(source, client.getSource()); } + @Test + public void testAwaitReceipt() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + verify(source).register(any()); + verify(sink, atLeast(1)).send(any()); + assertThat(context.checker.isReady()).isFalse(); + + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_AlreadyDone() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + inject(theMessage); + + verifyReceipt(); + + // calling again should result in "true" again, without injecting message + context.start(theMessage); + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_MessageDoesNotMatch() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + // non-matching message + inject("{}"); + + // wait for a few more calls to "send" and then inject a matching message + assertThat(context.awaitSend(3)).isTrue(); + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_DecodeFails() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + // force a failure and inject the message + context.forceDecodeFailure = true; + inject(theMessage); + + assertThat(context.awaitDecodeFailure()).isTrue(); + + // no more failures + context.forceDecodeFailure = false; + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_Interrupted() throws InterruptedException { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + context.interrupt(); + + verifyNoReceipt(); + } + + @Test + public void testAwaitReceipt_MultipleLoops() throws Exception { + context.start(theMessage); + + // wait for multiple "send" calls + assertThat(context.awaitSend(3)).isTrue(); + + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testStop() throws InterruptedException { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + context.stop(); + + verifyNoReceipt(); + } + + /** + * Verifies that awaitReceipt() returns {@code true}. + * + * @throws InterruptedException if interrupted while waiting for the thread to + * terminate + */ + private void verifyReceipt() throws InterruptedException { + assertThat(context.join()).isTrue(); + assertThat(context.result).isTrue(); + assertThat(context.exception).isNull(); + assertThat(context.checker.isReady()).isTrue(); + + verify(source).unregister(any()); + } + + /** + * Verifies that awaitReceipt() returns {@code false}. + * + * @throws InterruptedException if interrupted while waiting for the thread to + * terminate + */ + private void verifyNoReceipt() throws InterruptedException { + assertThat(context.join()).isTrue(); + assertThat(context.result).isFalse(); + assertThat(context.exception).isNull(); + assertThat(context.checker.isReady()).isFalse(); + + verify(source).unregister(any()); + } + + /** + * Injects a message into the source topic. + * + * @param message message to be injected + * @throws CoderException if the message cannot be encoded + */ + private void inject(MyMessage message) throws CoderException { + inject(coder.encode(message)); + } + + /** + * Injects a message into the source topic. + * + * @param message message to be injected + */ + private void inject(String message) { + ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class); + verify(source).register(cap.capture()); + + cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message); + } + /** * BidirectionalTopicClient with some overrides. @@ -185,4 +358,95 @@ public class BidirectionalTopicClientTest { return endpoint; } } + + private class Context { + private Thread thread; + private boolean result; + private Exception exception; + private boolean forceDecodeFailure; + + // released every time the checker publishes a message + private final Semaphore sendSem = new Semaphore(0); + + // released every time a message-decode fails + private final Semaphore decodeFailedSem = new Semaphore(0); + + private final BidirectionalTopicClient2 checker; + + public Context() throws BidirectionalTopicClientException { + + checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) { + + @Override + public boolean send(String messageText) { + boolean result = super.send(messageText); + sendSem.release(); + return result; + } + + @Override + protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException { + if (forceDecodeFailure) { + throw new CoderException("expected exception"); + } + + return super.decode(msg, clazz); + } + + @Override + protected void decodeFailed() { + super.decodeFailed(); + decodeFailedSem.release(); + } + }; + } + + /** + * Starts the thread. + * + * @param message message to be sent to the sink topic + */ + public void start(MyMessage message) { + thread = new Thread() { + @Override + public void run() { + try { + result = checker.awaitReady(message, SHORT_WAIT_MS); + } catch (Exception e) { + exception = e; + } + } + }; + thread.setDaemon(true); + thread.start(); + } + + public void stop() { + checker.stopWaiting(); + } + + public boolean join() throws InterruptedException { + thread.join(MAX_WAIT_MS); + return !thread.isAlive(); + } + + public void interrupt() { + thread.interrupt(); + } + + public boolean awaitSend(int npermits) throws InterruptedException { + return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + + public boolean awaitDecodeFailure() throws InterruptedException { + return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class MyMessage { + private String text; + } } |