aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java55
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java164
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java21
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java28
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java14
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java36
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java20
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java31
8 files changed, 177 insertions, 192 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
index 91fbad18..bd75995f 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,7 +27,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.drools.utils.Pair;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +35,11 @@ import org.slf4j.LoggerFactory;
* Extractors for each object class. Properties define how the data is to be
* extracted for a given class, where the properties are similar to the
* following:
- *
+ *
* <pre>
* <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
* </pre>
- *
+ *
* <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
* method to extract the specified field. If that fails, then it looks for a public field
* by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
@@ -71,7 +72,7 @@ public class ClassExtractors {
/**
* Constructor.
- *
+ *
* @param props properties that specify how the data is to be extracted from
* a given class
* @param prefix property name prefix, prepended before the class name
@@ -85,7 +86,7 @@ public class ClassExtractors {
/**
* Gets the number of extractors in the map.
- *
+ *
* @return gets the number of extractors in the map
*/
protected int size() {
@@ -94,7 +95,7 @@ public class ClassExtractors {
/**
* Extracts the desired data item from an object.
- *
+ *
* @param object object from which to extract the data item
* @return the extracted item, or {@code null} if it could not be extracted
*/
@@ -111,7 +112,7 @@ public class ClassExtractors {
/**
* Gets the extractor for the given type of object, creating one if it
* doesn't exist yet.
- *
+ *
* @param object object whose extracted is desired
* @return an extractor for the object
*/
@@ -129,9 +130,9 @@ public class ClassExtractors {
/**
* Builds an extractor for the class.
- *
+ *
* @param clazz class for which the extractor should be built
- *
+ *
* @return a new extractor
*/
private Extractor buildExtractor(Class<?> clazz) {
@@ -162,7 +163,7 @@ public class ClassExtractors {
/**
* Builds an extractor for the class, based on the config value extracted
* from the corresponding property.
- *
+ *
* @param clazz class for which the extractor should be built
* @param value config value (e.g., "${event.request.id}"
* @return a new extractor
@@ -209,7 +210,7 @@ public class ClassExtractors {
/**
* Gets the extractor for a class, examining all super classes and
* interfaces.
- *
+ *
* @param clazz class whose extractor is desired
* @param addOk {@code true} if the extractor may be added, provided the
* property is defined, {@code false} otherwise
@@ -268,7 +269,7 @@ public class ClassExtractors {
* hierarchically, where each name identifies a particular component within
* the hierarchy. Supports retrieval from {@link Map} objects, as well as
* via getXxx() methods, or by direct field retrieval.
- *
+ *
* <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
*/
private class ComponetizedExtractor implements Extractor {
@@ -280,7 +281,7 @@ public class ClassExtractors {
/**
* Constructor.
- *
+ *
* @param clazz the class associated with the object at the root of the
* hierarchy
* @param names name associated with each component
@@ -296,14 +297,14 @@ public class ClassExtractors {
Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
- extractors[x] = pair.first();
- clz = pair.second();
+ extractors[x] = pair.getLeft();
+ clz = pair.getRight();
}
}
/**
* Builds an extractor for the given component of an object.
- *
+ *
* @param clazz type of object from which the component will be
* extracted
* @param comp name of the component to extract
@@ -312,9 +313,9 @@ public class ClassExtractors {
* @throws ExtractorException extrator exception
*/
private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
+
Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
+
if (pair == null) {
pair = getFieldExtractor(clazz, comp);
}
@@ -350,7 +351,7 @@ public class ClassExtractors {
/**
* Gets an extractor that invokes a getXxx() method to retrieve the
* object.
- *
+ *
* @param clazz container's class
* @param name name of the property to be retrieved
* @return a new extractor, or {@code null} if the class does not
@@ -371,7 +372,7 @@ public class ClassExtractors {
return null;
}
- return new Pair<>(new MethodExtractor(meth), retType);
+ return Pair.of(new MethodExtractor(meth), retType);
} catch (NoSuchMethodException expected) {
// no getXxx() method, maybe there's a field by this name
@@ -385,7 +386,7 @@ public class ClassExtractors {
/**
* Gets an extractor for a field within the object.
- *
+ *
* @param clazz container's class
* @param name name of the field whose value is to be extracted
* @return a new extractor, or {@code null} if the class does not
@@ -399,12 +400,12 @@ public class ClassExtractors {
return null;
}
- return new Pair<>(new FieldExtractor(field), field.getType());
+ return Pair.of(new FieldExtractor(field), field.getType());
}
/**
* Gets an extractor for an item within a Map object.
- *
+ *
* @param clazz container's class
* @param key item key within the map
* @return a new extractor, or {@code null} if the class is not a Map
@@ -423,13 +424,13 @@ public class ClassExtractors {
* If the object turns out not to be a map, then the MapExtractor
* for the next component will just return null.
*/
- return new Pair<>(new MapExtractor(key), Map.class);
+ return Pair.of(new MapExtractor(key), Map.class);
}
/**
* Gets field within a class, examining all super classes and
* interfaces.
- *
+ *
* @param clazz class whose field is desired
* @param name name of the desired field
* @return the field within the class, or {@code null} if the field does
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index c35e525a..96b358da 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +63,6 @@ import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
-import org.onap.policy.drools.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,25 +80,21 @@ import org.slf4j.LoggerFactory;
*
* <p>Invoke {@link #runSlow()}, before the test, to slow things down.
*/
-public class FeatureTest {
+public class FeatureTest {
private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
-
/**
* Name of the topic used for inter-host communication.
*/
private static final String INTERNAL_TOPIC = "my.internal.topic";
-
/**
* Name of the topic from which "external" events "arrive".
*/
private static final String EXTERNAL_TOPIC = "my.external.topic";
-
/**
* Name of the controller.
*/
private static final String CONTROLLER1 = "controller.one";
-
private static long stdReactivateWaitMs = 200;
private static long stdIdentificationMs = 60;
private static long stdStartHeartbeatMs = 60;
@@ -111,28 +108,26 @@ public class FeatureTest {
* Used to decode events from the external topic.
*/
private static final Gson mapper = new Gson();
-
/**
* Used to identify the current context.
*/
private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
-
/**
* Context for the current test case.
*/
private Context ctx;
-
/**
* Setup.
*/
+
@Before
public void setUp() {
ctx = null;
}
-
/**
* Tear down.
*/
+
@After
public void tearDown() {
if (ctx != null) {
@@ -157,19 +152,14 @@ public class FeatureTest {
private void run(int nmessages, int nhosts) throws Exception {
ctx = new Context(nmessages);
-
for (int x = 0; x < nhosts; ++x) {
ctx.addHost();
}
-
ctx.startHosts();
-
for (int x = 0; x < nmessages; ++x) {
ctx.offerExternal(makeMessage(x));
}
-
ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
-
assertEquals(0, ctx.getDecodeErrors());
assertEquals(0, ctx.getRemainingEvents());
ctx.checkAllSawAMsg();
@@ -178,10 +168,10 @@ public class FeatureTest {
private String makeMessage(int reqnum) {
return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
}
-
/**
* Invoke this to slow the timers down.
*/
+
protected static void runSlow() {
stdReactivateWaitMs = 10000;
stdIdentificationMs = 10000;
@@ -193,58 +183,50 @@ public class FeatureTest {
stdInterPollMs = 2000;
stdEventWaitSec = 1000;
}
-
/**
* Decodes an event.
*
* @param event event
* @return the decoded event, or {@code null} if it cannot be decoded
*/
+
private static Object decodeEvent(String event) {
try {
return mapper.fromJson(event, TreeMap.class);
-
} catch (JsonParseException e) {
logger.warn("cannot decode external event", e);
return null;
}
}
-
/**
* Context used for a single test case.
*/
- private static class Context {
+ private static class Context {
/**
* Hosts that have been added to this context.
*/
private final Deque<Host> hosts = new LinkedList<>();
-
/**
* Maps a drools controller to its policy controller.
*/
private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
-
/**
* Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
*/
private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
-
/**
* Queue for the external "DMaaP" topic.
*/
private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
-
/**
* Counts the number of decode errors.
*/
private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
-
/**
* Number of events we're still waiting to receive.
*/
private final CountDownLatch eventCounter;
-
/**
* The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
* {@link #getCurrentHost()}.
@@ -256,13 +238,14 @@ public class FeatureTest {
*
* @param nEvents number of events to be processed
*/
+
public Context(int events) {
eventCounter = new CountDownLatch(events);
}
-
/**
* Destroys the context, stopping any hosts that remain.
*/
+
public void destroy() {
stopHosts();
hosts.clear();
@@ -273,16 +256,17 @@ public class FeatureTest {
*
* @return the new Host
*/
+
public Host addHost() {
Host host = new Host(this);
hosts.add(host);
-
return host;
}
/**
* Starts the hosts.
*/
+
public void startHosts() {
hosts.forEach(host -> host.start());
}
@@ -290,6 +274,7 @@ public class FeatureTest {
/**
* Stops the hosts.
*/
+
public void stopHosts() {
hosts.forEach(host -> host.stop());
}
@@ -297,6 +282,7 @@ public class FeatureTest {
/**
* Verifies that all hosts processed at least one message.
*/
+
public void checkAllSawAMsg() {
int msgs = 0;
for (Host host : hosts) {
@@ -312,6 +298,7 @@ public class FeatureTest {
* @param host host
* @param func function to invoke
*/
+
public void withHost(Host host, VoidFunction func) {
currentHost = host;
func.apply();
@@ -323,6 +310,7 @@ public class FeatureTest {
*
* @param event event
*/
+
public void offerExternal(String event) {
externalTopic.offer(event);
}
@@ -333,6 +321,7 @@ public class FeatureTest {
* @param channel channel
* @param queue the channel's queue
*/
+
public void addInternal(String channel, BlockingQueue<String> queue) {
channel2queue.put(channel, queue);
}
@@ -342,6 +331,7 @@ public class FeatureTest {
*
* @param message message
*/
+
public void offerInternal(String message) {
channel2queue.values().forEach(queue -> queue.offer(message));
}
@@ -352,6 +342,7 @@ public class FeatureTest {
* @param channel channel
* @param message message
*/
+
public void offerInternal(String channel, String message) {
BlockingQueue<String> queue = channel2queue.get(channel);
if (queue != null) {
@@ -365,6 +356,7 @@ public class FeatureTest {
* @param controller controller
* @param droolsController drools controller
*/
+
public void addController(PolicyController controller, DroolsController droolsController) {
drools2policy.put(droolsController, controller);
}
@@ -376,6 +368,7 @@ public class FeatureTest {
* @return the controller associated with a drools controller, or {@code null} if
* it has no associated controller
*/
+
public PolicyController getController(DroolsController droolsController) {
return drools2policy.get(droolsController);
}
@@ -385,6 +378,7 @@ public class FeatureTest {
*
* @return queue for the external topic
*/
+
public BlockingQueue<String> getExternalTopic() {
return externalTopic;
}
@@ -394,6 +388,7 @@ public class FeatureTest {
*
* @return the number of decode errors so far
*/
+
public int getDecodeErrors() {
return numDecodeErrors.get();
}
@@ -401,6 +396,7 @@ public class FeatureTest {
/**
* Increments the count of decode errors.
*/
+
public void bumpDecodeErrors() {
numDecodeErrors.incrementAndGet();
}
@@ -410,6 +406,7 @@ public class FeatureTest {
*
* @return the number of events that haven't been processed
*/
+
public long getRemainingEvents() {
return eventCounter.getCount();
}
@@ -417,6 +414,7 @@ public class FeatureTest {
/**
* Adds an event to the counter.
*/
+
public void addEvent() {
eventCounter.countDown();
}
@@ -429,6 +427,7 @@ public class FeatureTest {
* @return {@code true} if all events have been processed, {@code false} otherwise
* @throws InterruptedException throws interrupted
*/
+
public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
return eventCounter.await(time, units);
}
@@ -439,6 +438,7 @@ public class FeatureTest {
*
* @return the current host, or {@code null} if there is no current host
*/
+
public Host getCurrentHost() {
return currentHost;
}
@@ -447,25 +447,27 @@ public class FeatureTest {
/**
* Simulates a single "host".
*/
- private static class Host {
+ private static class Host {
private final Context context;
-
private final PoolingFeature feature;
/**
* {@code True} if this host has processed a message, {@code false} otherwise.
*/
+
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
/**
* This host's internal "DMaaP" topic.
*/
+
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
* Source that reads from the external topic and posts to the listener.
*/
+
private TopicSource externalSource;
// mock objects
@@ -478,25 +480,20 @@ public class FeatureTest {
*
* @param context context
*/
+
public Host(Context context) {
this.context = context;
-
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
-
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
externalSource.unregister(controller);
return true;
});
-
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
-
context.addController(controller, drools);
-
// arrange to read from the external topic
externalSource = new TopicSourceImpl(context, false);
-
feature = new PoolingFeatureImpl(context);
}
@@ -505,6 +502,7 @@ public class FeatureTest {
*
* @return the host name
*/
+
public String getName() {
return feature.getHost();
}
@@ -513,21 +511,16 @@ public class FeatureTest {
* Starts threads for the host so that it begins consuming from both the external
* "DMaaP" topic and its own internal "DMaaP" topic.
*/
- public void start() {
+ public void start() {
context.withHost(this, () -> {
-
feature.beforeStart(engine);
feature.afterCreate(controller);
-
// assign the queue for this host's internal topic
context.addInternal(getName(), msgQueue);
-
feature.beforeStart(controller);
-
// start consuming events from the external topic
externalSource.register(controller);
-
feature.afterStart(controller);
});
}
@@ -535,6 +528,7 @@ public class FeatureTest {
/**
* Stops the host's threads.
*/
+
public void stop() {
feature.beforeStop(controller);
externalSource.unregister(controller);
@@ -549,6 +543,7 @@ public class FeatureTest {
* @param event event
* @return {@code true} if the event was handled, {@code false} otherwise
*/
+
public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
return feature.beforeOffer(controller, protocol, topic2, event);
}
@@ -562,8 +557,8 @@ public class FeatureTest {
* @param success success
* @return {@code true} if the event was handled, {@code false} otherwise
*/
- public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
+ public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
return feature.afterOffer(controller, protocol, topic, event, success);
}
@@ -573,6 +568,7 @@ public class FeatureTest {
* @param fact fact
* @return {@code true} if the event was handled, {@code false} otherwise
*/
+
public boolean beforeInsert(Object fact) {
return feature.beforeInsert(drools, fact);
}
@@ -585,6 +581,7 @@ public class FeatureTest {
* controller, {@code false} otherwise
* @return {@code true} if the event was handled, {@code false} otherwise
*/
+
public boolean afterInsert(Object fact, boolean successInsert) {
return feature.afterInsert(drools, fact, successInsert);
}
@@ -592,6 +589,7 @@ public class FeatureTest {
/**
* Indicates that a message was seen for this host.
*/
+
public void sawMessage() {
sawMsg.set(true);
}
@@ -602,6 +600,7 @@ public class FeatureTest {
* @return {@code true} if a message was seen for this host, {@code false}
* otherwise
*/
+
public boolean messageSeen() {
return sawMsg.get();
}
@@ -611,6 +610,7 @@ public class FeatureTest {
*
* @return the queue associated with this host's internal topic
*/
+
public BlockingQueue<String> getInternalQueue() {
return msgQueue;
}
@@ -620,8 +620,8 @@ public class FeatureTest {
* Listener for the external topic. Simulates the actions taken by
* <i>AggregatedPolicyController.onTopicEvent</i>.
*/
- private static class MyExternalTopicListener implements Answer<Void> {
+ private static class MyExternalTopicListener implements Answer<Void> {
private final Context context;
private final Host host;
@@ -636,30 +636,23 @@ public class FeatureTest {
CommInfrastructure commType = args.getArgument(index++);
String topic = args.getArgument(index++);
String event = args.getArgument(index++);
-
if (host.beforeOffer(commType, topic, event)) {
return null;
}
-
boolean result;
Object fact = decodeEvent(event);
-
if (fact == null) {
result = false;
context.bumpDecodeErrors();
-
} else {
result = true;
-
if (!host.beforeInsert(fact)) {
// feature did not handle it so we handle it here
host.afterInsert(fact, result);
-
host.sawMessage();
context.addEvent();
}
}
-
host.afterOffer(commType, topic, event, result);
return null;
}
@@ -670,10 +663,9 @@ public class FeatureTest {
* <i>channel</i> embedded within the message. If it's the "admin" channel, then the
* message is placed on all queues.
*/
- private static class TopicSinkImpl extends TopicImpl implements TopicSink {
+ private static class TopicSinkImpl extends TopicImpl implements TopicSink {
private final Context context;
-
/**
* Used to decode the messages so that the channel can be extracted.
*/
@@ -684,6 +676,7 @@ public class FeatureTest {
*
* @param context context
*/
+
public TopicSinkImpl(Context context) {
this.context = context;
}
@@ -693,22 +686,17 @@ public class FeatureTest {
if (!isAlive()) {
return false;
}
-
try {
Message msg = serializer.decodeMsg(message);
String channel = msg.getChannel();
-
if (Message.ADMIN.equals(channel)) {
// add to every queue
context.offerInternal(message);
-
} else {
// add to a specific queue
context.offerInternal(channel, message);
}
-
return true;
-
} catch (JsonParseException e) {
logger.warn("could not decode message: {}", message);
context.bumpDecodeErrors();
@@ -720,15 +708,14 @@ public class FeatureTest {
/**
* Source implementation that reads from a queue associated with a topic.
*/
+
private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
private final String topic;
-
/**
* Queue from which to retrieve messages.
*/
private final BlockingQueue<String> queue;
-
/**
* Manages the current consumer thread. The "first" item is used as a trigger to
* tell the thread to stop processing, while the "second" item is triggered <i>by
@@ -743,11 +730,11 @@ public class FeatureTest {
* @param internal {@code true} if to read from the internal topic, {@code false}
* to read from the external topic
*/
+
public TopicSourceImpl(Context context, boolean internal) {
if (internal) {
this.topic = INTERNAL_TOPIC;
this.queue = context.getCurrentHost().getInternalQueue();
-
} else {
this.topic = EXTERNAL_TOPIC;
this.queue = context.getExternalTopic();
@@ -773,33 +760,25 @@ public class FeatureTest {
* Starts a thread that takes messages from the queue and gives them to the
* listener. Stops the thread of any previously registered listener.
*/
+
@Override
public void register(TopicListener listener) {
- Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
-
+ Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
reregister(newPair);
-
Thread thread = new Thread(() -> {
-
try {
do {
- processMessages(newPair.first(), listener);
- } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
-
+ processMessages(newPair.getLeft(), listener);
+ } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
logger.info("topic source thread completed");
-
} catch (InterruptedException e) {
logger.warn("topic source thread aborted", e);
Thread.currentThread().interrupt();
-
} catch (RuntimeException e) {
logger.warn("topic source thread aborted", e);
}
-
- newPair.second().countDown();
-
+ newPair.getRight().countDown();
});
-
thread.setDaemon(true);
thread.start();
}
@@ -807,6 +786,7 @@ public class FeatureTest {
/**
* Stops the thread of <i>any</i> currently registered listener.
*/
+
@Override
public void unregister(TopicListener listener) {
reregister(null);
@@ -818,6 +798,7 @@ public class FeatureTest {
*
* @param newPair the new "pair", or {@code null} to unregister
*/
+
private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
try {
Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
@@ -826,26 +807,20 @@ public class FeatureTest {
// unregister was invoked twice in a row
logger.warn("re-unregister for topic source");
}
-
// no previous thread to stop
return;
}
-
// need to stop the previous thread
-
// tell it to stop
- oldPair.first().countDown();
-
+ oldPair.getLeft().countDown();
// wait for it to stop
- if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
+ if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
logger.warn("old topic registration is still running");
}
-
} catch (InterruptedException e) {
logger.warn("old topic registration may still be running", e);
Thread.currentThread().interrupt();
}
-
if (newPair != null) {
// register was invoked twice in a row
logger.warn("re-register for topic source");
@@ -859,15 +834,13 @@ public class FeatureTest {
* @param listener listener
* @throws InterruptedException throws interrupted exception
*/
- private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
+ private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
-
String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
if (msg == null) {
return;
}
-
listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
}
}
@@ -877,11 +850,13 @@ public class FeatureTest {
* Topic implementation. Most methods just throw
* {@link UnsupportedOperationException}.
*/
+
private static class TopicImpl implements Topic {
/**
* Constructor.
*/
+
public TopicImpl() {
super();
}
@@ -960,8 +935,8 @@ public class FeatureTest {
/**
* Feature with overrides.
*/
- private static class PoolingFeatureImpl extends PoolingFeature {
+ private static class PoolingFeatureImpl extends PoolingFeature {
private final Context context;
/**
@@ -969,9 +944,9 @@ public class FeatureTest {
*
* @param context context
*/
+
public PoolingFeatureImpl(Context context) {
this.context = context;
-
/*
* Note: do NOT extract anything from "context" at this point, because it
* hasn't been fully initialized yet
@@ -981,9 +956,7 @@ public class FeatureTest {
@Override
public Properties getProperties(String featName) {
Properties props = new Properties();
-
props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
-
props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
@@ -998,7 +971,6 @@ public class FeatureTest {
"" + stdActiveHeartbeatMs);
props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
"" + stdInterHeartbeatMs);
-
return props;
}
@@ -1014,6 +986,7 @@ public class FeatureTest {
* @param spec specializer to be embedded
* @return the property name, with the specializer embedded within it
*/
+
private String specialize(String propnm, String spec) {
String suffix = propnm.substring(PREFIX.length());
return PREFIX + spec + "." + suffix;
@@ -1022,9 +995,7 @@ public class FeatureTest {
@Override
protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
CountDownLatch activeLatch) {
-
currentContext.set(context);
-
return new PoolingManagerTest(host, controller, props, activeLatch);
}
}
@@ -1032,6 +1003,7 @@ public class FeatureTest {
/**
* Pooling Manager with overrides.
*/
+
private static class PoolingManagerTest extends PoolingManagerImpl {
/**
@@ -1042,9 +1014,9 @@ public class FeatureTest {
* @param props the properties
* @param activeLatch the latch
*/
+
public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
CountDownLatch activeLatch) {
-
super(host, controller, props, activeLatch);
}
@@ -1067,6 +1039,7 @@ public class FeatureTest {
/**
* DMaaP Manager with overrides.
*/
+
private static class DmaapManagerImpl extends DmaapManager {
/**
@@ -1076,6 +1049,7 @@ public class FeatureTest {
* @param topic the topic
* @throws PoolingFeatureException if an error occurs
*/
+
public DmaapManagerImpl(String topic) throws PoolingFeatureException {
super(topic);
}
@@ -1094,16 +1068,16 @@ public class FeatureTest {
/**
* Controller that also implements the {@link TopicListener} interface.
*/
- private static interface ListenerController extends PolicyController, TopicListener {
+ private static interface ListenerController extends PolicyController, TopicListener {
}
/**
* Simple function that takes no arguments and returns nothing.
*/
+
@FunctionalInterface
private static interface VoidFunction {
-
void apply();
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index f9878a9d..63bfc11e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,6 +39,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -46,7 +48,6 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
-import org.onap.policy.drools.utils.Pair;
public class PoolingFeatureTest {
@@ -83,7 +84,7 @@ public class PoolingFeatureTest {
/**
* Setup.
- *
+ *
* @throws Exception exception
*/
@Before
@@ -113,8 +114,8 @@ public class PoolingFeatureTest {
pool.afterCreate(controller1);
pool.afterCreate(controller2);
- mgr1 = managers.get(0).first();
- mgr2 = managers.get(1).first();
+ mgr1 = managers.get(0).getLeft();
+ mgr2 = managers.get(1).getLeft();
}
@Test
@@ -245,7 +246,7 @@ public class PoolingFeatureTest {
verify(mgr1).afterStop();
assertFalse(pool.afterStop(controllerDisabled));
-
+
// count should be unchanged
verify(mgr1).afterStop();
}
@@ -254,7 +255,7 @@ public class PoolingFeatureTest {
public void testAfterHalt() {
assertFalse(pool.afterHalt(controller1));
assertFalse(pool.afterHalt(controller1));
-
+
verify(mgr1, never()).afterStop();
assertFalse(pool.afterStop(controllerDisabled));
@@ -264,7 +265,7 @@ public class PoolingFeatureTest {
public void testAfterShutdown() {
assertFalse(pool.afterShutdown(controller1));
assertFalse(pool.afterShutdown(controller1));
-
+
verify(mgr1, never()).afterStop();
assertFalse(pool.afterStop(controllerDisabled));
@@ -515,7 +516,7 @@ public class PoolingFeatureTest {
PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
- managers.add(new Pair<>(mgr, props));
+ managers.add(Pair.of(mgr, props));
return mgr;
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index c8cbdbb5..e24c3c1e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,6 +37,7 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -44,7 +46,6 @@ import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
-import org.onap.policy.drools.utils.Pair;
import org.onap.policy.drools.utils.Triple;
public class ActiveStateTest extends SupportBasicStateTester {
@@ -54,6 +55,7 @@ public class ActiveStateTest extends SupportBasicStateTester {
/**
* Setup.
*/
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -70,7 +72,7 @@ public class ActiveStateTest extends SupportBasicStateTester {
// ensure a heart beat was generated
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.second().getSource());
+ assertEquals(MY_HOST, msg.getRight().getSource());
}
@Test
@@ -187,7 +189,7 @@ public class ActiveStateTest extends SupportBasicStateTester {
state = new ActiveState(mgr);
/*
- *
+ *
* PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus
* should be ignored.
*/
@@ -360,8 +362,8 @@ public class ActiveStateTest extends SupportBasicStateTester {
verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first());
- assertEquals(MY_HOST, msg.second().getSource());
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
}
@Test
@@ -454,8 +456,8 @@ public class ActiveStateTest extends SupportBasicStateTester {
verify(mgr, times(1)).publish(any(), any());
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first());
- assertEquals(MY_HOST, msg.second().getSource());
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
}
@Test
@@ -469,13 +471,13 @@ public class ActiveStateTest extends SupportBasicStateTester {
// this message should go to itself
msg = capturePublishedMessage(Heartbeat.class, index++);
- assertEquals(MY_HOST, msg.first());
- assertEquals(MY_HOST, msg.second().getSource());
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
// this message should go to its successor
msg = capturePublishedMessage(Heartbeat.class, index++);
- assertEquals(HOST1, msg.first());
- assertEquals(MY_HOST, msg.second().getSource());
+ assertEquals(HOST1, msg.getLeft());
+ assertEquals(MY_HOST, msg.getRight().getSource());
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
index 77491616..ab468a1c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -36,7 +38,6 @@ import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
-import org.onap.policy.drools.utils.Pair;
public class InactiveStateTest extends SupportBasicStateTester {
@@ -44,8 +45,9 @@ public class InactiveStateTest extends SupportBasicStateTester {
/**
* Setup.
- *
+ *
*/
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -111,13 +113,13 @@ public class InactiveStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_REACTIVATE_WAIT_MS, timer.first().longValue());
+ assertEquals(STD_REACTIVATE_WAIT_MS, timer.getLeft().longValue());
// invoke the task - it should go to the state returned by the mgr
State next = mock(State.class);
when(mgr.goStart()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, timer.getRight().fire());
}
@Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index 97c9c95a..aa999b5d 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -39,7 +41,6 @@ import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
-import org.onap.policy.drools.utils.Pair;
public class QueryStateTest extends SupportBasicStateTester {
@@ -48,6 +49,7 @@ public class QueryStateTest extends SupportBasicStateTester {
/**
* Setup.
*/
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -77,8 +79,8 @@ public class QueryStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
- assertNotNull(timer.second());
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
}
@Test
@@ -251,15 +253,15 @@ public class QueryStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
- assertNotNull(timer.second());
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
// should published an Offline message and go inactive
State next = mock(State.class);
when(mgr.goStart()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, timer.getRight().fire());
// should continue distributing
verify(mgr, never()).startDistributing(null);
@@ -275,13 +277,13 @@ public class QueryStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
- assertNotNull(timer.second());
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, timer.getRight().fire());
// should have published a Leader message
Leader msg = captureAdminMessage(Leader.class);
@@ -304,14 +306,14 @@ public class QueryStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
- assertNotNull(timer.second());
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
// set up active state, as that's what it should return
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, timer.getRight().fire());
// should NOT have published a Leader message
assertTrue(admin.isEmpty());
@@ -335,14 +337,14 @@ public class QueryStateTest extends SupportBasicStateTester {
Pair<Long, StateTimerTask> timer = onceTasks.remove();
- assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
- assertNotNull(timer.second());
+ assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
+ assertNotNull(timer.getRight());
// set up inactive state, as that's what it should return
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
- assertEquals(next, timer.second().fire());
+ assertEquals(next, timer.getRight().fire());
// should NOT have published a Leader message
assertTrue(admin.isEmpty());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 092657e5..1fd49c50 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,6 +32,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.Forward;
@@ -40,7 +42,6 @@ import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
-import org.onap.policy.drools.utils.Pair;
import org.onap.policy.drools.utils.Triple;
public class StartStateTest extends SupportBasicStateTester {
@@ -50,6 +51,7 @@ public class StartStateTest extends SupportBasicStateTester {
/**
* Setup.
*/
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -81,8 +83,8 @@ public class StartStateTest extends SupportBasicStateTester {
Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
- assertEquals(MY_HOST, msg.first());
- assertEquals(state.getHbTimestampMs(), msg.second().getTimestampMs());
+ assertEquals(MY_HOST, msg.getLeft());
+ assertEquals(state.getHbTimestampMs(), msg.getRight().getTimestampMs());
/*
@@ -95,11 +97,11 @@ public class StartStateTest extends SupportBasicStateTester {
// invoke the task - it should generate another heartbeat
assertEquals(null, generator.third().fire());
- verify(mgr, times(2)).publish(MY_HOST, msg.second());
+ verify(mgr, times(2)).publish(MY_HOST, msg.getRight());
// and again
assertEquals(null, generator.third().fire());
- verify(mgr, times(3)).publish(MY_HOST, msg.second());
+ verify(mgr, times(3)).publish(MY_HOST, msg.getRight());
/*
@@ -107,13 +109,13 @@ public class StartStateTest extends SupportBasicStateTester {
*/
Pair<Long, StateTimerTask> checker = onceTasks.removeFirst();
- assertEquals(STD_HEARTBEAT_WAIT_MS, checker.first().longValue());
+ assertEquals(STD_HEARTBEAT_WAIT_MS, checker.getLeft().longValue());
// invoke the task - it should go to the state returned by the mgr
State next = mock(State.class);
when(mgr.goInactive()).thenReturn(next);
- assertEquals(next, checker.second().fire());
+ assertEquals(next, checker.getRight().fire());
verify(mgr).startDistributing(null);
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
index 4727652a..a1246938 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
@@ -3,13 +3,14 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -35,13 +36,13 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
-import org.onap.policy.drools.utils.Pair;
import org.onap.policy.drools.utils.Triple;
/**
@@ -116,7 +117,7 @@ public class SupportBasicStateTester {
/**
* Setup.
- *
+ *
* @throws Exception throws exception
*/
public void setUp() throws Exception {
@@ -152,7 +153,7 @@ public class SupportBasicStateTester {
// capture publish() arguments
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
- published.add(new Pair<>((String) args[0], (Message) args[1]));
+ published.add(Pair.of((String) args[0], (Message) args[1]));
return null;
}).when(mgr).publish(anyString(), any(Message.class));
@@ -168,7 +169,7 @@ public class SupportBasicStateTester {
// capture schedule() arguments, and return a new future
when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
Object[] args = invocation.getArguments();
- onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
+ onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
onceSchedules.add(sched);
@@ -198,7 +199,7 @@ public class SupportBasicStateTester {
/**
* Makes a sorted set of hosts.
- *
+ *
* @param hosts the hosts to be sorted
* @return the set of hosts, sorted
*/
@@ -208,7 +209,7 @@ public class SupportBasicStateTester {
/**
* Captures the host array from the Leader message published to the admin channel.
- *
+ *
* @return the host array, as a list
*/
protected List<String> captureHostList() {
@@ -217,7 +218,7 @@ public class SupportBasicStateTester {
/**
* Captures the host array from the Leader message published to the admin channel.
- *
+ *
* @return the host array
*/
protected String[] captureHostArray() {
@@ -231,7 +232,7 @@ public class SupportBasicStateTester {
/**
* Captures the assignments from the Leader message published to the admin channel.
- *
+ *
* @return the bucket assignments
*/
protected BucketAssignments captureAssignments() {
@@ -244,7 +245,7 @@ public class SupportBasicStateTester {
/**
* Captures the message published to the admin channel.
- *
+ *
* @param clazz type of {@link Message} to capture
* @return the message that was published
*/
@@ -254,7 +255,7 @@ public class SupportBasicStateTester {
/**
* Captures the message published to the admin channel.
- *
+ *
* @param clazz type of {@link Message} to capture
* @param index index of the item to be captured
* @return the message that was published
@@ -265,7 +266,7 @@ public class SupportBasicStateTester {
/**
* Captures the message published to the non-admin channels.
- *
+ *
* @param clazz type of {@link Message} to capture
* @return the (channel,message) pair that was published
*/
@@ -275,13 +276,13 @@ public class SupportBasicStateTester {
/**
* Captures the message published to the non-admin channels.
- *
+ *
* @param clazz type of {@link Message} to capture
* @param index index of the item to be captured
* @return the (channel,message) pair that was published
*/
protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
Pair<String, Message> msg = published.get(index);
- return new Pair<>(msg.first(), clazz.cast(msg.second()));
+ return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));
}
}