diff options
Diffstat (limited to 'policy-endpoints')
2 files changed, 380 insertions, 1 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java index 57a331c1..2a9f1446 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,9 @@ package org.onap.policy.common.endpoints.event.comm.client; import java.util.Arrays; import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -29,6 +32,11 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A "bidirectional" topic, which is a pair of topics, one of which is used to publish @@ -36,6 +44,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; */ @Getter public class BidirectionalTopicClient { + private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class); + private static final Coder coder = new StandardCoder(); + private final String sinkTopic; private final String sourceTopic; private final TopicSink sink; @@ -44,6 +55,16 @@ public class BidirectionalTopicClient { private final CommInfrastructure sourceTopicCommInfrastructure; /** + * Used when checking whether or not a message sent on the sink topic can be received + * on the source topic. When a matching message is received on the incoming topic, + * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting + * thread is interrupted, then {@code false} is placed on the queue. Whenever a value + * is pulled from the queue, it is immediately placed back on the queue. + */ + private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>(); + + + /** * Constructs the object. * * @param sinkTopic sink topic name @@ -94,9 +115,103 @@ public class BidirectionalTopicClient { source.unregister(topicListener); } + /** + * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has + * previously returned {@code true}). + * + * @return {@code true}, if the topic is ready to send and receive + */ + public boolean isReady() { + return Boolean.TRUE.equals(checkerQueue.peek()); + } + + /** + * Waits for the bidirectional topic to become "ready" by publishing a message on the + * sink topic and awaiting receipt of the message on the source topic. If the message + * is not received within a few seconds, then it tries again. This process is + * continued until the message is received, {@link #stop()} is called, or this thread + * is interrupted. Once this returns, subsequent calls will return immediately, always + * with the same value. + * + * @param message message to be sent to the sink topic. Note: the equals() method must + * return {@code true} if and only if two messages are the same + * @param waitMs time to wait, in milliseconds, before re-sending the message + * @return {@code true} if the message was received from the source topic, + * {@code false} if this method was stopped or interrupted before receipt of + * the message + * @throws CoderException if the message cannot be encoded + */ + public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException { + // see if we already know the answer + if (!checkerQueue.isEmpty()) { + return checkerQueue.peek(); + } + + final String messageText = coder.encode(message); + + // class of message to be decoded + @SuppressWarnings("unchecked") + final Class<? extends T> clazz = (Class<? extends T>) message.getClass(); + + // create a listener to detect when a matching message is received + final TopicListener listener = (infra, topic, msg) -> { + try { + T incoming = decode(msg, clazz); + + if (message.equals(incoming)) { + logger.info("topic {} is ready; found matching message {}", topic, incoming); + checkerQueue.add(Boolean.TRUE); + } + + } catch (CoderException e) { + logger.warn("cannot decode message from topic {}", topic, e); + decodeFailed(); + } + }; + + source.register(listener); + + // loop until the message is received + try { + Boolean result; + do { + send(messageText); + } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null); + + // put it back on the queue + checkerQueue.add(result); + + } catch (InterruptedException e) { + logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e); + Thread.currentThread().interrupt(); + checkerQueue.add(Boolean.FALSE); + + } finally { + source.unregister(listener); + } + + return checkerQueue.peek(); + } + + /** + * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by + * adding {@code false} to the queue. + */ + public void stopWaiting() { + checkerQueue.add(Boolean.FALSE); + } + // these may be overridden by junit tests protected TopicEndpoint getTopicEndpointManager() { return TopicEndpointManager.getManager(); } + + protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException { + return coder.decode(msg, clazz); + } + + protected void decodeFailed() { + // already logged - nothing else to do + } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java index cfb65e16..2605c14b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.client; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -28,16 +29,24 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -46,11 +55,18 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; @RunWith(MockitoJUnitRunner.class) public class BidirectionalTopicClientTest { + private static final Coder coder = new StandardCoder(); + private static final long MAX_WAIT_MS = 5000; + private static final long SHORT_WAIT_MS = 1; private static final String SINK_TOPIC = "my-sink-topic"; private static final String SOURCE_TOPIC = "my-source-topic"; + private static final String MY_TEXT = "my-text"; private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB; private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP; @@ -64,7 +80,10 @@ public class BidirectionalTopicClientTest { @Mock private TopicListener listener; + private MyMessage theMessage; + private BidirectionalTopicClient client; + private Context context; /** * Configures the endpoints. @@ -104,7 +123,16 @@ public class BidirectionalTopicClientTest { when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList()); when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source)); + theMessage = new MyMessage(MY_TEXT); + client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC); + + context = new Context(); + } + + @After + public void tearDown() { + context.stop(); } @Test @@ -169,6 +197,151 @@ public class BidirectionalTopicClientTest { assertNotSame(source, client.getSource()); } + @Test + public void testAwaitReceipt() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + verify(source).register(any()); + verify(sink, atLeast(1)).send(any()); + assertThat(context.checker.isReady()).isFalse(); + + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_AlreadyDone() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + inject(theMessage); + + verifyReceipt(); + + // calling again should result in "true" again, without injecting message + context.start(theMessage); + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_MessageDoesNotMatch() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + // non-matching message + inject("{}"); + + // wait for a few more calls to "send" and then inject a matching message + assertThat(context.awaitSend(3)).isTrue(); + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_DecodeFails() throws Exception { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + // force a failure and inject the message + context.forceDecodeFailure = true; + inject(theMessage); + + assertThat(context.awaitDecodeFailure()).isTrue(); + + // no more failures + context.forceDecodeFailure = false; + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testAwaitReceipt_Interrupted() throws InterruptedException { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + context.interrupt(); + + verifyNoReceipt(); + } + + @Test + public void testAwaitReceipt_MultipleLoops() throws Exception { + context.start(theMessage); + + // wait for multiple "send" calls + assertThat(context.awaitSend(3)).isTrue(); + + inject(theMessage); + + verifyReceipt(); + } + + @Test + public void testStop() throws InterruptedException { + context.start(theMessage); + assertThat(context.awaitSend(1)).isTrue(); + + context.stop(); + + verifyNoReceipt(); + } + + /** + * Verifies that awaitReceipt() returns {@code true}. + * + * @throws InterruptedException if interrupted while waiting for the thread to + * terminate + */ + private void verifyReceipt() throws InterruptedException { + assertThat(context.join()).isTrue(); + assertThat(context.result).isTrue(); + assertThat(context.exception).isNull(); + assertThat(context.checker.isReady()).isTrue(); + + verify(source).unregister(any()); + } + + /** + * Verifies that awaitReceipt() returns {@code false}. + * + * @throws InterruptedException if interrupted while waiting for the thread to + * terminate + */ + private void verifyNoReceipt() throws InterruptedException { + assertThat(context.join()).isTrue(); + assertThat(context.result).isFalse(); + assertThat(context.exception).isNull(); + assertThat(context.checker.isReady()).isFalse(); + + verify(source).unregister(any()); + } + + /** + * Injects a message into the source topic. + * + * @param message message to be injected + * @throws CoderException if the message cannot be encoded + */ + private void inject(MyMessage message) throws CoderException { + inject(coder.encode(message)); + } + + /** + * Injects a message into the source topic. + * + * @param message message to be injected + */ + private void inject(String message) { + ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class); + verify(source).register(cap.capture()); + + cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message); + } + /** * BidirectionalTopicClient with some overrides. @@ -185,4 +358,95 @@ public class BidirectionalTopicClientTest { return endpoint; } } + + private class Context { + private Thread thread; + private boolean result; + private Exception exception; + private boolean forceDecodeFailure; + + // released every time the checker publishes a message + private final Semaphore sendSem = new Semaphore(0); + + // released every time a message-decode fails + private final Semaphore decodeFailedSem = new Semaphore(0); + + private final BidirectionalTopicClient2 checker; + + public Context() throws BidirectionalTopicClientException { + + checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) { + + @Override + public boolean send(String messageText) { + boolean result = super.send(messageText); + sendSem.release(); + return result; + } + + @Override + protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException { + if (forceDecodeFailure) { + throw new CoderException("expected exception"); + } + + return super.decode(msg, clazz); + } + + @Override + protected void decodeFailed() { + super.decodeFailed(); + decodeFailedSem.release(); + } + }; + } + + /** + * Starts the thread. + * + * @param message message to be sent to the sink topic + */ + public void start(MyMessage message) { + thread = new Thread() { + @Override + public void run() { + try { + result = checker.awaitReady(message, SHORT_WAIT_MS); + } catch (Exception e) { + exception = e; + } + } + }; + thread.setDaemon(true); + thread.start(); + } + + public void stop() { + checker.stopWaiting(); + } + + public boolean join() throws InterruptedException { + thread.join(MAX_WAIT_MS); + return !thread.isAlive(); + } + + public void interrupt() { + thread.interrupt(); + } + + public boolean awaitSend(int npermits) throws InterruptedException { + return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + + public boolean awaitDecodeFailure() throws InterruptedException { + return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class MyMessage { + private String text; + } } |