summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java117
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java264
2 files changed, 380 insertions, 1 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index 57a331c1..2a9f1446 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,9 @@ package org.onap.policy.common.endpoints.event.comm.client;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -29,6 +32,11 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A "bidirectional" topic, which is a pair of topics, one of which is used to publish
@@ -36,6 +44,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
*/
@Getter
public class BidirectionalTopicClient {
+ private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
+ private static final Coder coder = new StandardCoder();
+
private final String sinkTopic;
private final String sourceTopic;
private final TopicSink sink;
@@ -44,6 +55,16 @@ public class BidirectionalTopicClient {
private final CommInfrastructure sourceTopicCommInfrastructure;
/**
+ * Used when checking whether or not a message sent on the sink topic can be received
+ * on the source topic. When a matching message is received on the incoming topic,
+ * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
+ * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
+ * is pulled from the queue, it is immediately placed back on the queue.
+ */
+ private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();
+
+
+ /**
* Constructs the object.
*
* @param sinkTopic sink topic name
@@ -94,9 +115,103 @@ public class BidirectionalTopicClient {
source.unregister(topicListener);
}
+ /**
+ * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
+ * previously returned {@code true}).
+ *
+ * @return {@code true}, if the topic is ready to send and receive
+ */
+ public boolean isReady() {
+ return Boolean.TRUE.equals(checkerQueue.peek());
+ }
+
+ /**
+ * Waits for the bidirectional topic to become "ready" by publishing a message on the
+ * sink topic and awaiting receipt of the message on the source topic. If the message
+ * is not received within a few seconds, then it tries again. This process is
+ * continued until the message is received, {@link #stop()} is called, or this thread
+ * is interrupted. Once this returns, subsequent calls will return immediately, always
+ * with the same value.
+ *
+ * @param message message to be sent to the sink topic. Note: the equals() method must
+ * return {@code true} if and only if two messages are the same
+ * @param waitMs time to wait, in milliseconds, before re-sending the message
+ * @return {@code true} if the message was received from the source topic,
+ * {@code false} if this method was stopped or interrupted before receipt of
+ * the message
+ * @throws CoderException if the message cannot be encoded
+ */
+ public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
+ // see if we already know the answer
+ if (!checkerQueue.isEmpty()) {
+ return checkerQueue.peek();
+ }
+
+ final String messageText = coder.encode(message);
+
+ // class of message to be decoded
+ @SuppressWarnings("unchecked")
+ final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
+
+ // create a listener to detect when a matching message is received
+ final TopicListener listener = (infra, topic, msg) -> {
+ try {
+ T incoming = decode(msg, clazz);
+
+ if (message.equals(incoming)) {
+ logger.info("topic {} is ready; found matching message {}", topic, incoming);
+ checkerQueue.add(Boolean.TRUE);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot decode message from topic {}", topic, e);
+ decodeFailed();
+ }
+ };
+
+ source.register(listener);
+
+ // loop until the message is received
+ try {
+ Boolean result;
+ do {
+ send(messageText);
+ } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);
+
+ // put it back on the queue
+ checkerQueue.add(result);
+
+ } catch (InterruptedException e) {
+ logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
+ Thread.currentThread().interrupt();
+ checkerQueue.add(Boolean.FALSE);
+
+ } finally {
+ source.unregister(listener);
+ }
+
+ return checkerQueue.peek();
+ }
+
+ /**
+ * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
+ * adding {@code false} to the queue.
+ */
+ public void stopWaiting() {
+ checkerQueue.add(Boolean.FALSE);
+ }
+
// these may be overridden by junit tests
protected TopicEndpoint getTopicEndpointManager() {
return TopicEndpointManager.getManager();
}
+
+ protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
+ return coder.decode(msg, clazz);
+ }
+
+ protected void decodeFailed() {
+ // already logged - nothing else to do
+ }
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
index cfb65e16..2605c14b 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.client;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -28,16 +29,24 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -46,11 +55,18 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
@RunWith(MockitoJUnitRunner.class)
public class BidirectionalTopicClientTest {
+ private static final Coder coder = new StandardCoder();
+ private static final long MAX_WAIT_MS = 5000;
+ private static final long SHORT_WAIT_MS = 1;
private static final String SINK_TOPIC = "my-sink-topic";
private static final String SOURCE_TOPIC = "my-source-topic";
+ private static final String MY_TEXT = "my-text";
private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
@@ -64,7 +80,10 @@ public class BidirectionalTopicClientTest {
@Mock
private TopicListener listener;
+ private MyMessage theMessage;
+
private BidirectionalTopicClient client;
+ private Context context;
/**
* Configures the endpoints.
@@ -104,7 +123,16 @@ public class BidirectionalTopicClientTest {
when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
+ theMessage = new MyMessage(MY_TEXT);
+
client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
+
+ context = new Context();
+ }
+
+ @After
+ public void tearDown() {
+ context.stop();
}
@Test
@@ -169,6 +197,151 @@ public class BidirectionalTopicClientTest {
assertNotSame(source, client.getSource());
}
+ @Test
+ public void testAwaitReceipt() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ verify(source).register(any());
+ verify(sink, atLeast(1)).send(any());
+ assertThat(context.checker.isReady()).isFalse();
+
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_AlreadyDone() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ inject(theMessage);
+
+ verifyReceipt();
+
+ // calling again should result in "true" again, without injecting message
+ context.start(theMessage);
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ // non-matching message
+ inject("{}");
+
+ // wait for a few more calls to "send" and then inject a matching message
+ assertThat(context.awaitSend(3)).isTrue();
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_DecodeFails() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ // force a failure and inject the message
+ context.forceDecodeFailure = true;
+ inject(theMessage);
+
+ assertThat(context.awaitDecodeFailure()).isTrue();
+
+ // no more failures
+ context.forceDecodeFailure = false;
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_Interrupted() throws InterruptedException {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ context.interrupt();
+
+ verifyNoReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_MultipleLoops() throws Exception {
+ context.start(theMessage);
+
+ // wait for multiple "send" calls
+ assertThat(context.awaitSend(3)).isTrue();
+
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testStop() throws InterruptedException {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ context.stop();
+
+ verifyNoReceipt();
+ }
+
+ /**
+ * Verifies that awaitReceipt() returns {@code true}.
+ *
+ * @throws InterruptedException if interrupted while waiting for the thread to
+ * terminate
+ */
+ private void verifyReceipt() throws InterruptedException {
+ assertThat(context.join()).isTrue();
+ assertThat(context.result).isTrue();
+ assertThat(context.exception).isNull();
+ assertThat(context.checker.isReady()).isTrue();
+
+ verify(source).unregister(any());
+ }
+
+ /**
+ * Verifies that awaitReceipt() returns {@code false}.
+ *
+ * @throws InterruptedException if interrupted while waiting for the thread to
+ * terminate
+ */
+ private void verifyNoReceipt() throws InterruptedException {
+ assertThat(context.join()).isTrue();
+ assertThat(context.result).isFalse();
+ assertThat(context.exception).isNull();
+ assertThat(context.checker.isReady()).isFalse();
+
+ verify(source).unregister(any());
+ }
+
+ /**
+ * Injects a message into the source topic.
+ *
+ * @param message message to be injected
+ * @throws CoderException if the message cannot be encoded
+ */
+ private void inject(MyMessage message) throws CoderException {
+ inject(coder.encode(message));
+ }
+
+ /**
+ * Injects a message into the source topic.
+ *
+ * @param message message to be injected
+ */
+ private void inject(String message) {
+ ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
+ verify(source).register(cap.capture());
+
+ cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
+ }
+
/**
* BidirectionalTopicClient with some overrides.
@@ -185,4 +358,95 @@ public class BidirectionalTopicClientTest {
return endpoint;
}
}
+
+ private class Context {
+ private Thread thread;
+ private boolean result;
+ private Exception exception;
+ private boolean forceDecodeFailure;
+
+ // released every time the checker publishes a message
+ private final Semaphore sendSem = new Semaphore(0);
+
+ // released every time a message-decode fails
+ private final Semaphore decodeFailedSem = new Semaphore(0);
+
+ private final BidirectionalTopicClient2 checker;
+
+ public Context() throws BidirectionalTopicClientException {
+
+ checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
+
+ @Override
+ public boolean send(String messageText) {
+ boolean result = super.send(messageText);
+ sendSem.release();
+ return result;
+ }
+
+ @Override
+ protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
+ if (forceDecodeFailure) {
+ throw new CoderException("expected exception");
+ }
+
+ return super.decode(msg, clazz);
+ }
+
+ @Override
+ protected void decodeFailed() {
+ super.decodeFailed();
+ decodeFailedSem.release();
+ }
+ };
+ }
+
+ /**
+ * Starts the thread.
+ *
+ * @param message message to be sent to the sink topic
+ */
+ public void start(MyMessage message) {
+ thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ result = checker.awaitReady(message, SHORT_WAIT_MS);
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void stop() {
+ checker.stopWaiting();
+ }
+
+ public boolean join() throws InterruptedException {
+ thread.join(MAX_WAIT_MS);
+ return !thread.isAlive();
+ }
+
+ public void interrupt() {
+ thread.interrupt();
+ }
+
+ public boolean awaitSend(int npermits) throws InterruptedException {
+ return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+
+ public boolean awaitDecodeFailure() throws InterruptedException {
+ return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class MyMessage {
+ private String text;
+ }
}