summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-10-29 18:15:51 -0400
committerJim Hahn <jrh3@att.com>2020-11-02 18:17:49 -0500
commitdc6c4a21d46909dad59f4bd16cb6d4fc29fcce77 (patch)
treeef6bd7f152e189913c802d07aea43c83f3dc97e2 /feature-pooling-dmaap/src
parentd418aff3a9fd547941e40978c648d6209d332e37 (diff)
Make feature-pooling-dmaap work without filtering
As DMaaP server-side filtering has been deprecated, modified feature-pooling-dmaap to work without it. The new design assumes that each pdp gets its own unique consumer group, thus all pdps receive all events. Each pdp then uses the bucket assignments to determine whether or not to process the event. Note: this means that events no longer have to be forwarded to the correct host, thus the "Forward" class has been deleted. Other than that, the code already did post-filtering of events so most of it still works even without server-side filtering. As a result, most of the effort was in simply removing code that no longer applies. Per review comments: Modified code to use the event hash code instead of the request ID has code when routing events. This eliminated the need for the extractor classes and related properties. Replaced amsterdam and beijing properties with usecases properties. Issue-ID: POLICY-2881 Change-Id: I87e4f98c14f419593879c278d7da053c80575553 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src')
-rw-r--r--feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java33
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java54
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java10
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java221
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java458
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java36
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java58
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java61
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java22
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java179
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java49
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java42
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java88
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java32
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java348
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java33
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java445
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java40
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java31
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java35
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java78
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java74
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java100
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java214
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java13
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java110
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java31
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java15
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java12
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java15
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java30
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java37
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java8
39 files changed, 172 insertions, 3135 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
index 2786a613..59c4b472 100644
--- a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-pooling-dmaap
# ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,34 +65,23 @@
# Topic used for inter-host communication for a particular controller
# pooling.<controller-name>.topic=XXX
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
# Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties. However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties. However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
# the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 9ba844ed..08c82fea 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
@@ -46,7 +45,7 @@ public class DmaapManager {
/**
* Topic source whose filter is to be manipulated.
*/
- private final FilterableTopicSource topicSource;
+ private final TopicSource topicSource;
/**
* Where to publish messages.
@@ -79,9 +78,6 @@ public class DmaapManager {
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
- // verify that we can set the filter
- setFilter(null);
-
} catch (IllegalArgumentException e) {
logger.error("failed to attach to topic {}", topic);
throw new PoolingFeatureException(e);
@@ -98,15 +94,10 @@ public class DmaapManager {
* @return the topic source
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
- private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ private TopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : getTopicSources()) {
if (topic.equals(src.getTopic())) {
- if (src instanceof FilterableTopicSource) {
- return (FilterableTopicSource) src;
-
- } else {
- throw new PoolingFeatureException("topic source " + topic + " is not filterable");
- }
+ return src;
}
}
@@ -199,22 +190,6 @@ public class DmaapManager {
}
/**
- * Sets the server-side filter to be used by the consumer.
- *
- * @param filter the filter string, or {@code null} if no filter is to be used
- * @throws PoolingFeatureException if the topic is not filterable
- */
- public void setFilter(String filter) throws PoolingFeatureException {
- try {
- logger.debug("change filter for topic {} to {}", topic, filter);
- topicSource.setFilter(filter);
-
- } catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic, e);
- }
- }
-
- /**
* Publishes a message to the sink.
*
* @param msg message to be published
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 00b7a215..c7574925 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -77,11 +77,11 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
- * later. As multiple threads can be active within the methods at the same time, we must keep
- * this in thread local storage.
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
- private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
/**
* Constructor.
@@ -236,19 +236,19 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- if (mgr.beforeOffer(protocol, topic2, event)) {
+ if (mgr.beforeOffer(topic2, event)) {
return true;
}
- offerArgs.set(new OfferArgs(protocol, topic2, event));
+ offerTopics.set(topic2);
return false;
}
@Override
public boolean beforeInsert(DroolsController droolsController, Object fact) {
- OfferArgs args = offerArgs.get();
- if (args == null) {
+ String topic = offerTopics.get();
+ if (topic == null) {
logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
@@ -279,7 +279,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ return mgr.beforeInsert(topic, fact);
}
@Override
@@ -287,7 +287,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean success) {
// clear any stored arguments
- offerArgs.remove();
+ offerTopics.remove();
return false;
}
@@ -345,40 +345,6 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
- /**
- * Arguments captured from beforeOffer().
- */
- private static class OfferArgs {
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event text that was received on the topic.
- */
- private String event;
-
- /**
- * Constructor.
- *
- * @param protocol protocol
- * @param topic topic
- * @param event the actual event data received on the topic
- */
- public OfferArgs(CommInfrastructure protocol, String topic, String event) {
- this.protocol = protocol;
- this.topic = topic;
- this.event = event;
- }
- }
-
/*
* The remaining methods may be overridden by junit tests.
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index e3576b8f..cc8e3a4d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
@@ -84,13 +83,6 @@ public interface PoolingManager {
void publish(String channel, Message msg);
/**
- * Handles a {@link Forward} event that was received from the internal topic.
- *
- * @param event event
- */
- void handle(Forward event);
-
- /**
* Schedules a timer to fire after a delay.
*
* @param delayMs delay, in milliseconds
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index b2966101..9093c7c0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling;
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
- /**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
private final CountDownLatch activeLatch;
@@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final DmaapManager dmaapMgr;
/**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
- /**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
@@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
- /**
* Constructs the manager, initializing all of the data structures.
*
* @param host name/uuid of this host
@@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*/
@@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
@@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
@@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
/**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
- /**
* Decodes an event from a String into an event Object.
*
* @param topic2 topic
@@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
- /**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
* decoded.
@@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index 80eee501..4627b9eb 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import java.util.HashMap;
import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
@@ -60,7 +59,6 @@ public class Serializer {
private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
static {
- class2type.put(Forward.class, "forward");
class2type.put(Heartbeat.class, "heartbeat");
class2type.put(Identification.class, "identification");
class2type.put(Leader.class, "leader");
@@ -79,7 +77,7 @@ public class Serializer {
/**
* Encodes a filter.
- *
+ *
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
*/
@@ -89,7 +87,7 @@ public class Serializer {
/**
* Encodes a message.
- *
+ *
* @param msg message to be encoded
* @return the message, serialized as a JSON string
*/
@@ -108,7 +106,7 @@ public class Serializer {
/**
* Decodes a JSON string into a Message.
- *
+ *
* @param msg JSON string representing the message
* @return the message
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
deleted file mode 100644
index bd75995f..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
- private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
- /**
- * Properties that specify how the data is to be extracted from a given
- * class.
- */
- private final Properties properties;
-
- /**
- * Property prefix, including a trailing ".".
- */
- private final String prefix;
-
- /**
- * Type of item to be extracted.
- */
- private final String type;
-
- /**
- * Maps the class name to its extractor.
- */
- private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param props properties that specify how the data is to be extracted from
- * a given class
- * @param prefix property name prefix, prepended before the class name
- * @param type type of item to be extracted
- */
- public ClassExtractors(Properties props, String prefix, String type) {
- this.properties = props;
- this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
- this.type = type;
- }
-
- /**
- * Gets the number of extractors in the map.
- *
- * @return gets the number of extractors in the map
- */
- protected int size() {
- return class2extractor.size();
- }
-
- /**
- * Extracts the desired data item from an object.
- *
- * @param object object from which to extract the data item
- * @return the extracted item, or {@code null} if it could not be extracted
- */
- public Object extract(Object object) {
- if (object == null) {
- return null;
- }
-
- Extractor ext = getExtractor(object);
-
- return ext.extract(object);
- }
-
- /**
- * Gets the extractor for the given type of object, creating one if it
- * doesn't exist yet.
- *
- * @param object object whose extracted is desired
- * @return an extractor for the object
- */
- private Extractor getExtractor(Object object) {
- Class<?> clazz = object.getClass();
- Extractor ext = class2extractor.get(clazz.getName());
-
- if (ext == null) {
- // allocate a new extractor, if another thread doesn't beat us to it
- ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
-
- return ext;
- }
-
- /**
- * Builds an extractor for the class.
- *
- * @param clazz class for which the extractor should be built
- *
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz) {
- String value = properties.getProperty(prefix + clazz.getName(), null);
- if (value != null) {
- // property has config info for this class - build the extractor
- return buildExtractor(clazz, value);
- }
-
- /*
- * Get the extractor, if any, for the super class or interfaces, but
- * don't add one if it doesn't exist
- */
- Extractor ext = getClassExtractor(clazz, false);
- if (ext != null) {
- return ext;
- }
-
- /*
- * No extractor defined for for this class or its super class - we
- * cannot extract data items from objects of this type, so just
- * allocated a null extractor.
- */
- logger.warn("missing property {}{}", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- /**
- * Builds an extractor for the class, based on the config value extracted
- * from the corresponding property.
- *
- * @param clazz class for which the extractor should be built
- * @param value config value (e.g., "${event.request.id}"
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz, String value) {
- if (!value.startsWith("${")) {
- logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
- return new NullExtractor();
- }
-
- if (!value.endsWith("}")) {
- logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // get the part in the middle
- String val = value.substring(2, value.length() - 1);
- if (val.startsWith(".")) {
- logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- if (val.endsWith(".")) {
- logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // everything's valid - create the extractor
- try {
- ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
- /*
- * If there's only one extractor, then just return it, otherwise
- * return the whole extractor.
- */
- return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
- } catch (ExtractorException e) {
- logger.warn("cannot build extractor for {}", clazz.getName(), e);
- return new NullExtractor();
- }
- }
-
- /**
- * Gets the extractor for a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose extractor is desired
- * @param addOk {@code true} if the extractor may be added, provided the
- * property is defined, {@code false} otherwise
- * @return the extractor to be used for the class, or {@code null} if no
- * extractor has been defined yet
- */
- private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
- if (clazz == null) {
- return null;
- }
-
- Extractor ext = null;
-
- if (addOk) {
- String val = properties.getProperty(prefix + clazz.getName(), null);
-
- if (val != null) {
- /*
- * A property is defined for this class, so create the extractor
- * for it.
- */
- return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
- }
-
- // see if the superclass has an extractor
- if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
- return ext;
- }
-
- // check the interfaces, too
- for (Class<?> clz : clazz.getInterfaces()) {
- if ((ext = getClassExtractor(clz, true)) != null) {
- break;
- }
- }
-
- return ext;
- }
-
- /**
- * Extractor that always returns {@code null}. Used when no extractor could
- * be built for a given object type.
- */
- private class NullExtractor implements Extractor {
-
- @Override
- public Object extract(Object object) {
- logger.info("cannot extract {} from {}", type, object.getClass());
- return null;
- }
- }
-
- /**
- * Component-ized extractor. Extracts an object that is referenced
- * hierarchically, where each name identifies a particular component within
- * the hierarchy. Supports retrieval from {@link Map} objects, as well as
- * via getXxx() methods, or by direct field retrieval.
- *
- * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
- */
- private class ComponetizedExtractor implements Extractor {
-
- /**
- * Extractor for each component.
- */
- private final Extractor[] extractors;
-
- /**
- * Constructor.
- *
- * @param clazz the class associated with the object at the root of the
- * hierarchy
- * @param names name associated with each component
- * @throws ExtractorException extractor exception
- */
- public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
- this.extractors = new Extractor[names.length];
-
- Class<?> clz = clazz;
-
- for (int x = 0; x < names.length; ++x) {
- String comp = names[x];
-
- Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
- extractors[x] = pair.getLeft();
- clz = pair.getRight();
- }
- }
-
- /**
- * Builds an extractor for the given component of an object.
- *
- * @param clazz type of object from which the component will be
- * extracted
- * @param comp name of the component to extract
- * @return a pair containing the extractor and the extracted object's
- * type
- * @throws ExtractorException extrator exception
- */
- private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
- Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
- if (pair == null) {
- pair = getFieldExtractor(clazz, comp);
- }
-
- if (pair == null) {
- pair = getMapExtractor(clazz, comp);
- }
-
-
- // didn't find an extractor
- if (pair == null) {
- throw new ExtractorException("class " + clazz + " contains no element " + comp);
- }
-
- return pair;
- }
-
- @Override
- public Object extract(Object object) {
- Object obj = object;
-
- for (Extractor ext : extractors) {
- if (obj == null) {
- break;
- }
-
- obj = ext.extract(obj);
- }
-
- return obj;
- }
-
- /**
- * Gets an extractor that invokes a getXxx() method to retrieve the
- * object.
- *
- * @param clazz container's class
- * @param name name of the property to be retrieved
- * @return a new extractor, or {@code null} if the class does not
- * contain the corresponding getXxx() method
- * @throws ExtractorException if the getXxx() method is inaccessible
- */
- private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
- Method meth;
-
- String nm = "get" + StringUtils.capitalize(name);
-
- try {
- meth = clazz.getMethod(nm);
-
- Class<?> retType = meth.getReturnType();
- if (retType == void.class) {
- // it's a void method, thus it won't return an object
- return null;
- }
-
- return Pair.of(new MethodExtractor(meth), retType);
-
- } catch (NoSuchMethodException expected) {
- // no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
- }
- }
-
- /**
- * Gets an extractor for a field within the object.
- *
- * @param clazz container's class
- * @param name name of the field whose value is to be extracted
- * @return a new extractor, or {@code null} if the class does not
- * contain the given field
- * @throws ExtractorException if the field is inaccessible
- */
- private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
- Field field = getClassField(clazz, name);
- if (field == null) {
- return null;
- }
-
- return Pair.of(new FieldExtractor(field), field.getType());
- }
-
- /**
- * Gets an extractor for an item within a Map object.
- *
- * @param clazz container's class
- * @param key item key within the map
- * @return a new extractor, or {@code null} if the class is not a Map
- * subclass
- */
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
- if (!Map.class.isAssignableFrom(clazz)) {
- return null;
- }
-
- /*
- * Don't know the value's actual type, so we'll assume it's a Map
- * for now. Things should still work OK, as this is only used to
- * direct the constructor on what type of extractor to create next.
- * If the object turns out not to be a map, then the MapExtractor
- * for the next component will just return null.
- */
- return Pair.of(new MapExtractor(key), Map.class);
- }
-
- /**
- * Gets field within a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose field is desired
- * @param name name of the desired field
- * @return the field within the class, or {@code null} if the field does
- * not exist
- * @throws ExtractorException if the field is inaccessible
- */
- private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
- if (clazz == null) {
- return null;
- }
-
- try {
- return clazz.getField(name);
-
- } catch (NoSuchFieldException expected) {
- // no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
- }
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
deleted file mode 100644
index 5e02d602..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
- /**
- * Extracts an object contained within another object.
- *
- * @param object object from which to extract the contained object
- * @return the extracted value, or {@code null} if it cannot be extracted
- */
- Object extract(Object object);
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
deleted file mode 100644
index a864672b..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public ExtractorException() {
- super();
- }
-
- public ExtractorException(String message) {
- super(message);
- }
-
- public ExtractorException(Throwable cause) {
- super(cause);
- }
-
- public ExtractorException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
deleted file mode 100644
index 9389ab22..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
- /**
- * Field containing the object.
- */
- private final Field field;
-
- /**
- * Constructor.
- *
- * @param field field containing the object
- */
- public FieldExtractor(Field field) {
- this.field = field;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return field.get(object);
-
- } catch (IllegalAccessException | IllegalArgumentException e) {
- logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
deleted file mode 100644
index 9c5be5ff..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
- /**
- * Key to the item to extract from the map.
- */
- private final String key;
-
- /**
- * Constructor.
- *
- * @param key key to the item to extract from the map
- */
- public MapExtractor(String key) {
- this.key = key;
- }
-
- @Override
- public Object extract(Object object) {
-
- if (object instanceof Map) {
- Map<?, ?> map = (Map<?, ?>) object;
-
- return map.get(key);
-
- } else {
- logger.warn("expecting a map, instead of {}", object.getClass());
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
deleted file mode 100644
index 3efef5ec..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
- /**
- * Method to invoke to extract the contained object.
- */
- private final Method method;
-
- /**
- * Constructor.
- *
- * @param method method to invoke to extract the contained object
- */
- public MethodExtractor(Method method) {
- this.method = method;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return method.invoke(object);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index 6be080f7..c78fb17b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,7 +63,7 @@ public class BucketAssignments {
/**
* Constructor.
- *
+ *
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
*/
@@ -81,7 +81,7 @@ public class BucketAssignments {
/**
* Gets the leader, which is the host with the minimum UUID.
- *
+ *
* @return the assignment leader
*/
public String getLeader() {
@@ -103,7 +103,7 @@ public class BucketAssignments {
/**
* Determines if a host has an assignment.
- *
+ *
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
@@ -123,7 +123,7 @@ public class BucketAssignments {
/**
* Gets all of the hosts that have an assignment.
- *
+ *
* @return all of the hosts that have an assignment
*/
public Set<String> getAllHosts() {
@@ -143,7 +143,7 @@ public class BucketAssignments {
/**
* Gets the host assigned to a given bucket.
- *
+ *
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
@@ -153,12 +153,12 @@ public class BucketAssignments {
return null;
}
- return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
* Gets the number of buckets.
- *
+ *
* @return the number of buckets
*/
public int size() {
@@ -168,7 +168,7 @@ public class BucketAssignments {
/**
* Checks the validity of the assignments, verifying that all buckets have been
* assigned to a host.
- *
+ *
* @throws PoolingFeatureException if the assignments are invalid
*/
public void checkValidity() throws PoolingFeatureException {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
deleted file mode 100644
index 3c34e971..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
- /**
- * Number of hops (i.e., number of times it's been forwarded) so far.
- */
- private int numHops;
-
- /**
- * Time, in milliseconds, at which the message was created.
- */
- private long createTimeMs;
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event pay load that was received on the topic.
- */
- private String payload;
-
- /**
- * The request id that was extracted from the event.
- */
- private String requestId;
-
- /**
- * Constructor.
- */
- public Forward() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param source host on which the message originated
- * @param protocol protocol
- * @param topic topic
- * @param payload the actual event data received on the topic
- * @param requestId request id
- */
- public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
- super(source);
-
- this.numHops = 0;
- this.createTimeMs = System.currentTimeMillis();
- this.protocol = protocol;
- this.topic = topic;
- this.payload = payload;
- this.requestId = requestId;
- }
-
- /**
- * Increments {@link #numHops}.
- */
- public void bumpNumHops() {
- ++numHops;
- }
-
- public int getNumHops() {
- return numHops;
- }
-
- public void setNumHops(int numHops) {
- this.numHops = numHops;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public void setCreateTimeMs(long createTimeMs) {
- this.createTimeMs = createTimeMs;
- }
-
- public CommInfrastructure getProtocol() {
- return protocol;
- }
-
- public void setProtocol(CommInfrastructure protocol) {
- this.protocol = protocol;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public boolean isExpired(long minCreateTimeMs) {
- return (createTimeMs < minCreateTimeMs);
-
- }
-
- @Override
- public void checkValidity() throws PoolingFeatureException {
-
- super.checkValidity();
-
- if (protocol == null) {
- throw new PoolingFeatureException("missing message protocol");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new PoolingFeatureException("missing message topic");
- }
-
- /*
- * Note: an empty pay load is OK, as an empty message could have been
- * received on the topic.
- */
-
- if (payload == null) {
- throw new PoolingFeatureException("missing message payload");
- }
-
- if (requestId == null || requestId.isEmpty()) {
- throw new PoolingFeatureException("missing message requestId");
- }
-
- if (numHops < 0) {
- throw new PoolingFeatureException("invalid message hop count");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
deleted file mode 100644
index 5b7012d2..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
- // message element names
- public static final String MSG_CHANNEL = "channel";
- public static final String MSG_TIMESTAMP = "timestampMs";
-
- // json element names
- protected static final String JSON_CLASS = "class";
- protected static final String JSON_FILTERS = "filters";
- protected static final String JSON_FIELD = "field";
- protected static final String JSON_VALUE = "value";
-
- // values to be stuck into the "class" element
- protected static final String CLASS_OR = "Or";
- protected static final String CLASS_AND = "And";
- protected static final String CLASS_EQUALS = "Equals";
-
- /**
- * Constructor.
- */
- private FilterUtils() {
- super();
- }
-
- /**
- * Makes a filter that verifies that a field equals a value.
- *
- * @param field name of the field to check
- * @param value desired value
- * @return a map representing an "equals" filter
- */
- public static Map<String, Object> makeEquals(String field, String value) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_EQUALS);
- map.put(JSON_FIELD, field);
- map.put(JSON_VALUE, value);
-
- return map;
- }
-
- /**
- * Makes an "and" filter, where all of the items must be true.
- *
- * @param items items to be checked
- * @return an "and" filter
- */
- public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_AND);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-
- /**
- * Makes an "or" filter, where at least one of the items must be true.
- *
- * @param items items to be checked
- * @return an "or" filter
- */
- public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_OR);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index 59d264ec..c5761bfd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,16 +20,8 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +41,7 @@ public class StartState extends State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
@@ -58,7 +50,7 @@ public class StartState extends State {
/**
* Get Heart beat time stamp in milliseconds.
- *
+ *
* @return the time stamp inserted into the heart beat message
*/
public long getHbTimestampMs() {
@@ -110,14 +102,4 @@ public class StartState extends State {
return null;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getFilter() {
- // ignore everything except our most recent heart beat message
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
- makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index f2277be3..853c24d4 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -20,22 +20,15 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
@@ -71,18 +64,6 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic. The
- * default method returns a filter that accepts messages on the admin channel and on
- * the host's own channel.
- *
- * @return the server-side filter to use.
- */
- @SuppressWarnings("unchecked")
- public Map<String, Object> getFilter() {
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
- }
-
- /**
* Cancels the timers added by this state.
*/
public final void cancelTimers() {
@@ -142,26 +123,6 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to handle and
- * returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Forward msg) {
- if (getHost().equals(msg.getChannel())) {
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
-
- } else {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
- getTopic());
- }
-
- return null;
- }
-
- /**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
@@ -293,16 +254,6 @@ public abstract class State {
* @param channel channel
* @param msg message to be published
*/
- protected final void publish(String channel, Forward msg) {
- mgr.publish(channel, msg);
- }
-
- /**
- * Publishes a message on the specified channel.
- *
- * @param channel channel
- * @param msg message to be published
- */
protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index 7f73a702..ec554fc9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -48,10 +47,9 @@ public class DmaapManagerTest {
private static final String EXPECTED = "expected";
private static final String MY_TOPIC = "my.topic";
private static final String MSG = "a message";
- private static final String FILTER = "a filter";
private TopicListener listener;
- private FilterableTopicSource source;
+ private TopicSource source;
private boolean gotSources;
private TopicSink sink;
private boolean gotSinks;
@@ -65,7 +63,7 @@ public class DmaapManagerTest {
@Before
public void setUp() throws Exception {
listener = mock(TopicListener.class);
- source = mock(FilterableTopicSource.class);
+ source = mock(TopicSource.class);
gotSources = false;
sink = mock(TopicSink.class);
gotSinks = false;
@@ -104,35 +102,12 @@ public class DmaapManagerTest {
};
}
- @Test(expected = PoolingFeatureException.class)
- public void testDmaapManager_CannotFilter() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- new DmaapManagerImpl(MY_TOPIC);
- }
-
@Test
public void testGetTopic() {
assertEquals(MY_TOPIC, mgr.getTopic());
}
@Test(expected = PoolingFeatureException.class)
- public void testFindTopicSource_NotFilterableTopicSource() throws PoolingFeatureException {
-
- // matching topic, but doesn't have the correct interface
- TopicSource source2 = mock(TopicSource.class);
- when(source2.getTopic()).thenReturn(MY_TOPIC);
-
- new DmaapManagerImpl(MY_TOPIC) {
- @Override
- protected List<TopicSource> getTopicSources() {
- return Arrays.asList(source2);
- }
- };
- }
-
- @Test(expected = PoolingFeatureException.class)
public void testFindTopicSource_NotFound() throws PoolingFeatureException {
// one item in list, and its topic doesn't match
new DmaapManagerImpl(MY_TOPIC) {
@@ -275,19 +250,6 @@ public class DmaapManagerTest {
}
@Test
- public void testSetFilter() throws PoolingFeatureException {
- assertThatCode(() -> mgr.setFilter(FILTER)).doesNotThrowAnyException();
- }
-
- @Test(expected = PoolingFeatureException.class)
- public void testSetFilter_Exception() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- mgr.setFilter(FILTER);
- }
-
- @Test
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index 96b358da..efab636b 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,20 +47,19 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
@@ -216,10 +215,6 @@ public class FeatureTest {
*/
private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
/**
- * Queue for the external "DMaaP" topic.
- */
- private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
- /**
* Counts the number of decode errors.
*/
private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
@@ -306,13 +301,16 @@ public class FeatureTest {
}
/**
- * Offers an event to the external topic.
+ * Offers an event to the external topic. As each host needs a copy, it is posted
+ * to each Host's queue.
*
* @param event event
*/
public void offerExternal(String event) {
- externalTopic.offer(event);
+ for (Host host : hosts) {
+ host.getExternalTopic().offer(event);
+ }
}
/**
@@ -337,20 +335,6 @@ public class FeatureTest {
}
/**
- * Offers amessage to an internal channel.
- *
- * @param channel channel
- * @param message message
- */
-
- public void offerInternal(String channel, String message) {
- BlockingQueue<String> queue = channel2queue.get(channel);
- if (queue != null) {
- queue.offer(message);
- }
- }
-
- /**
* Associates a controller with its drools controller.
*
* @param controller controller
@@ -374,16 +358,6 @@ public class FeatureTest {
}
/**
- * Constructor.
- *
- * @return queue for the external topic
- */
-
- public BlockingQueue<String> getExternalTopic() {
- return externalTopic;
- }
-
- /**
* Get decode errors.
*
* @return the number of decode errors so far
@@ -465,6 +439,12 @@ public class FeatureTest {
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
+ * Queue for the external "DMaaP" topic.
+ */
+ @Getter
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
+ /**
* Source that reads from the external topic and posts to the listener.
*/
@@ -493,7 +473,7 @@ public class FeatureTest {
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
context.addController(controller, drools);
// arrange to read from the external topic
- externalSource = new TopicSourceImpl(context, false);
+ externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
feature = new PoolingFeatureImpl(context);
}
@@ -666,10 +646,6 @@ public class FeatureTest {
private static class TopicSinkImpl extends TopicImpl implements TopicSink {
private final Context context;
- /**
- * Used to decode the messages so that the channel can be extracted.
- */
- private final Serializer serializer = new Serializer();
/**
* Constructor.
@@ -687,15 +663,7 @@ public class FeatureTest {
return false;
}
try {
- Message msg = serializer.decodeMsg(message);
- String channel = msg.getChannel();
- if (Message.ADMIN.equals(channel)) {
- // add to every queue
- context.offerInternal(message);
- } else {
- // add to a specific queue
- context.offerInternal(channel, message);
- }
+ context.offerInternal(message);
return true;
} catch (JsonParseException e) {
logger.warn("could not decode message: {}", message);
@@ -709,7 +677,7 @@ public class FeatureTest {
* Source implementation that reads from a queue associated with a topic.
*/
- private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+ private static class TopicSourceImpl extends TopicImpl implements TopicSource {
private final String topic;
/**
@@ -726,24 +694,13 @@ public class FeatureTest {
/**
* Constructor.
*
- * @param context context
- * @param internal {@code true} if to read from the internal topic, {@code false}
- * to read from the external topic
+ * @param type topic type
+ * @param queue topic from which to read
*/
- public TopicSourceImpl(Context context, boolean internal) {
- if (internal) {
- this.topic = INTERNAL_TOPIC;
- this.queue = context.getCurrentHost().getInternalQueue();
- } else {
- this.topic = EXTERNAL_TOPIC;
- this.queue = context.getExternalTopic();
- }
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("topic filter set to: {}", filter);
+ public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+ this.topic = type;
+ this.queue = queue;
}
@Override
@@ -1056,7 +1013,8 @@ public class FeatureTest {
@Override
protected List<TopicSource> getTopicSources() {
- return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+ return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+ currentContext.get().getCurrentHost().getInternalQueue()));
}
@Override
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 63bfc11e..02a4db5c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -298,20 +298,20 @@ public class PoolingFeatureTest {
@Test
public void testBeforeOffer() {
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure that the args were captured
pool.beforeInsert(drools1, OBJECT1);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
// ensure that the new args were captured
pool.beforeInsert(drools1, OBJECT2);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
@@ -326,14 +326,14 @@ public class PoolingFeatureTest {
public void testBeforeOffer_MgrTrue() {
// manager will return true
- when(mgr1.beforeOffer(any(), any(), any())).thenReturn(true);
+ when(mgr1.beforeOffer(any(), any())).thenReturn(true);
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure it's still in the map by re-invoking
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
}
@@ -342,12 +342,12 @@ public class PoolingFeatureTest {
public void testBeforeInsert() {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(drools1, OBJECT2));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
@@ -358,10 +358,10 @@ public class PoolingFeatureTest {
// call beforeInsert without beforeOffer
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -376,7 +376,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -391,7 +391,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -407,7 +407,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -426,7 +426,7 @@ public class PoolingFeatureTest {
assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 2a0066b7..21bd62d1 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -25,10 +25,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -49,9 +47,7 @@ import org.mockito.ArgumentCaptor;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,7 +79,6 @@ public class PoolingManagerImplTest {
private static final String THE_EVENT = "the event";
private static final Object DECODED_EVENT = new Object();
- private static final String REQUEST_ID = "my.request.id";
/**
* Number of dmaap.publish() invocations that should be issued when the manager is
@@ -98,7 +93,6 @@ public class PoolingManagerImplTest {
private PoolingProperties poolProps;
private ListeningController controller;
- private ClassExtractors extractors;
private DmaapManager dmaap;
private boolean gotDmaap;
private ScheduledThreadPoolExecutor sched;
@@ -132,7 +126,6 @@ public class PoolingManagerImplTest {
ser = new Serializer();
active = new CountDownLatch(1);
- extractors = mock(ClassExtractors.class);
dmaap = mock(DmaapManager.class);
gotDmaap = false;
controller = mock(ListeningController.class);
@@ -140,8 +133,6 @@ public class PoolingManagerImplTest {
schedCount = 0;
drools = mock(DroolsController.class);
- when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
-
when(controller.getName()).thenReturn(MY_CONTROLLER);
when(controller.getDrools()).thenReturn(drools);
when(controller.isAlive()).thenReturn(true);
@@ -176,18 +167,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testPoolingManagerImpl_ClassEx() {
- /*
- * this controller does not implement TopicListener, which should cause a
- * ClassCastException
- */
- PolicyController ctlr = mock(PolicyController.class);
-
- assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, ctlr, poolProps, active))
- .isInstanceOf(PoolingFeatureRtException.class).hasCauseInstanceOf(ClassCastException.class);
- }
-
- @Test
public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
// throw an exception when we try to create the dmaap manager
PoolingFeatureException ex = new PoolingFeatureException();
@@ -284,23 +263,20 @@ public class PoolingManagerImplTest {
startMgr();
mgr.startDistributing(makeAssignments(false));
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ verify(dmaap, times(START_PUB)).publish(any());
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
- verify(dmaap, times(START_PUB + 2)).publish(any());
+ verify(dmaap, times(START_PUB + 1)).publish(any());
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
// verify that next message is handled locally
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 2)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(dmaap, times(START_PUB + 1)).publish(any());
}
@Test
@@ -407,19 +383,11 @@ public class PoolingManagerImplTest {
// start should invoke changeState()
startMgr();
- int ntimes = 0;
-
- // should have set the filter for the StartState
- verify(dmaap, times(++ntimes)).setFilter(any());
-
/*
* now go offline while it's locked
*/
lockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// should have cancelled the timers
assertEquals(2, futures.size());
verify(futures.poll()).cancel(false);
@@ -430,9 +398,6 @@ public class PoolingManagerImplTest {
*/
unlockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// new timers should now be active
assertEquals(2, futures.size());
verify(futures.poll(), never()).cancel(false);
@@ -440,26 +405,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testSetFilter() throws Exception {
- // start should cause a filter to be set
- startMgr();
-
- verify(dmaap).setFilter(any());
- }
-
- @Test
- public void testSetFilter_DmaapEx() throws Exception {
-
- // generate an exception
- doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
-
- // start should invoke setFilter()
- assertThatCode(() -> startMgr()).doesNotThrowAnyException();
-
- // no exception, means success
- }
-
- @Test
public void testSchedule() throws Exception {
// must start the scheduler
startMgr();
@@ -583,64 +528,35 @@ public class PoolingManagerImplTest {
}
@Test
- public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+ public void testBeforeOffer_Unlocked() throws Exception {
startMgr();
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testBeforeOffer_Locked_NoIntercept() throws Exception {
- startMgr();
-
- lockMgr();
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeOffer_Locked_Intercept() throws Exception {
+ public void testBeforeOffer_Locked() throws Exception {
startMgr();
lockMgr();
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(false);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeInsert_Intercept() throws Exception {
+ public void testBeforeInsert() throws Exception {
startMgr();
lockMgr();
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testBeforeInsert_NoIntercept() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
@@ -657,17 +573,17 @@ public class PoolingManagerImplTest {
public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
startMgr();
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
}
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
@@ -678,7 +594,7 @@ public class PoolingManagerImplTest {
@Test
public void testHandleEvent_NullTarget() throws Exception {
// buckets have null targets
- validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
+ validateDiscarded(new BucketAssignments(new String[] {null, null}));
}
@Test
@@ -687,46 +603,9 @@ public class PoolingManagerImplTest {
}
@Test
- public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
- mgr.handle(msg);
-
- // shouldn't publish
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandleEvent_DiffHost_Forward() throws Exception {
- validateHandled(makeAssignments(false), START_PUB + 1);
- }
-
- @Test
- public void testExtractRequestId_NullEvent() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
- }
-
- @Test
- public void testExtractRequestId_NullReqId() throws Exception {
- validateHandleReqId(null);
- }
-
- @Test
- public void testExtractRequestId() throws Exception {
- startMgr();
-
+ public void testHandleEvent_DiffHost() throws Exception {
// route the message to the *OTHER* host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateDiscarded(makeAssignments(false));
}
@Test
@@ -746,7 +625,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -767,7 +646,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -787,7 +666,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -807,7 +686,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -819,101 +698,7 @@ public class PoolingManagerImplTest {
// route to another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testMakeForward() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- }
-
- @Test
- public void testMakeForward_InvalidMsg() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- // should not have tried to publish a message
- verify(dmaap, times(START_PUB)).publish(any());
- }
-
- @Test
- public void testHandle_SameHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandle_DiffHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testInject() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testInject_Ex() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- // generate RuntimeException when onTopicEvent() is invoked
- doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -971,20 +756,18 @@ public class PoolingManagerImplTest {
// null assignments should cause message to be processed locally
mgr.startDistributing(null);
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
- // route the message to this host
+ // message for this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- // route the message to the other host
+ // message for another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
@@ -1088,9 +871,7 @@ public class PoolingManagerImplTest {
private void validateHandleReqId(String requestId) throws PoolingFeatureException {
startMgr();
- when(extractors.extract(any())).thenReturn(requestId);
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
private void validateNoForward() throws PoolingFeatureException {
@@ -1099,67 +880,23 @@ public class PoolingManagerImplTest {
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
}
- private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+ private void validateUnhandled() throws PoolingFeatureException {
startMgr();
-
- // route the message to the *OTHER* host
- mgr.startDistributing(assignments);
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(publishCount)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
- private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+ private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
startMgr();
- assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
- }
-
- /**
- * Configure the mock controller to act like a real controller, invoking beforeOffer
- * and then beforeInsert, so we can make sure they pass through. We'll keep count to
- * ensure we don't get into infinite recursion.
- *
- * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
- * {@code false} if it should be skipped
- *
- * @return a latch that will be counted down if both beforeXxx() methods return false
- */
- private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
- CountDownLatch recursion = new CountDownLatch(3);
- CountDownLatch latch = new CountDownLatch(1);
-
- doAnswer(args -> {
-
- recursion.countDown();
- if (recursion.getCount() == 0) {
- fail("recursive calls to onTopicEvent");
- }
-
- int iarg = 0;
- CommInfrastructure proto = args.getArgument(iarg++);
- String topic = args.getArgument(iarg++);
- String event = args.getArgument(iarg++);
-
- if (mgr.beforeOffer(proto, topic, event)) {
- return null;
- }
- if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
- return null;
- }
-
- latch.countDown();
-
- return null;
- }).when(controller).onTopicEvent(any(), any(), any());
+ // buckets have null targets
+ mgr.startDistributing(bucketAssignments);
- return latch;
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
/**
@@ -1170,7 +907,7 @@ public class PoolingManagerImplTest {
* @return a new bucket assignment
*/
private BucketAssignments makeAssignments(boolean sameHost) {
- int slot = REQUEST_ID.hashCode() % 2;
+ int slot = DECODED_EVENT.hashCode() % 2;
// slot numbers are 0 and 1 - reverse them if it's for a different host
if (!sameHost) {
@@ -1199,6 +936,7 @@ public class PoolingManagerImplTest {
*/
private void lockMgr() {
mgr.beforeLock();
+ when(controller.isLocked()).thenReturn(true);
}
/**
@@ -1206,6 +944,7 @@ public class PoolingManagerImplTest {
*/
private void unlockMgr() {
mgr.afterUnlock();
+ when(controller.isLocked()).thenReturn(false);
}
/**
@@ -1227,11 +966,6 @@ public class PoolingManagerImplTest {
}
@Override
- protected ClassExtractors makeClassExtractors(Properties props) {
- return extractors;
- }
-
- @Override
protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
gotDmaap = true;
return dmaap;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
index 662e0a7c..f4cd940c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
@@ -23,15 +23,9 @@ package org.onap.policy.drools.pooling;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import com.google.gson.JsonParseException;
-import java.util.Map;
-import java.util.TreeMap;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
@@ -44,33 +38,6 @@ public class SerializerTest {
}
@Test
- @SuppressWarnings("unchecked")
- public void testEncodeFilter() throws Exception {
- final Serializer ser = new Serializer();
-
- /*
- * Ensure raw maps serialize as expected. Use a TreeMap so the field
- * order is predictable.
- */
- Map<String, Object> top = new TreeMap<>();
- Map<String, Object> inner = new TreeMap<>();
- top.put("abc", 20);
- top.put("def", inner);
- top.put("ghi", true);
- inner.put("xyz", 30);
- assertEquals("{'abc':20,'def':{'xyz':30},'ghi':true}".replace('\'', '"'), ser.encodeFilter(top));
-
- /*
- * Ensure we can encode a complicated filter without throwing an
- * exception
- */
- Map<String, Object> complexFilter = makeAnd(makeEquals("fieldC", "valueC"),
- makeOr(makeEquals("fieldA", "valueA"), makeEquals("fieldB", "valueB")));
- String val = ser.encodeFilter(complexFilter);
- assertFalse(val.isEmpty());
- }
-
- @Test
public void testEncodeMsg_testDecodeMsg() throws Exception {
Serializer ser = new Serializer();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
deleted file mode 100644
index 0bf087a3..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.function.Function;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ClassExtractorsTest {
-
- private static final int NTIMES = 5;
-
- private static final String MY_TYPE = "theType";
- private static final String PROP_PREFIX = "extractor." + MY_TYPE + ".";
-
- private static final String VALUE = "a value";
- private static final Integer INT_VALUE = 10;
- private static final Integer INT_VALUE2 = 20;
-
- private Properties props;
- private ClassExtractors map;
-
- /**
- * Setup.
- *
- */
- @Before
- public void setUp() {
- props = new Properties();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- }
-
- @Test
- public void testExtract() {
- Simple obj = new Simple();
- assertEquals(INT_VALUE, map.extract(obj));
-
- // string value
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-
- // null object
- assertNull(map.extract(null));
-
- // values from two different kinds of objects
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Simple()));
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass method, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass field, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Sub()));
-
-
- // prefix includes trailing "."
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX.substring(0, PROP_PREFIX.length() - 1), MY_TYPE);
- assertEquals(INT_VALUE, map.extract(new Simple()));
-
-
- // values from an class in a different file
- props = new Properties();
- props.setProperty(PROP_PREFIX + ClassExtractorsTestSupport.class.getName(), "${nested.theValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(ClassExtractorsTestSupport2.NESTED_VALUE, map.extract(new ClassExtractorsTestSupport()));
- }
-
- @Test
- public void testGetExtractor() {
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertEquals("x=" + x, INT_VALUE, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testBuildExtractorClass_TopLevel() {
- // extractor defined for top-level class
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_SuperClass() {
- // extractor defined for superclass (interface)
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class and one for interface
- assertEquals(2, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_NotDefined() {
- // no extractor defined for "this" class
- assertNull(map.extract(this));
-
- // one NULL extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClassString() {
- // no leading "${"
- assertNull(tryIt(Simple.class, "intValue}", xxx -> new Simple()));
-
- // no trailing "}"
- assertNull(tryIt(Simple.class, "${intValue", xxx -> new Simple()));
-
- // leading "."
- assertNull(tryIt(Sub.class, "${.simple.strValue}", xxx -> new Sub()));
-
- // trailing "."
- assertNull(tryIt(Sub.class, "${simple.strValue.}", xxx -> new Sub()));
-
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // two components
- assertEquals(VALUE, tryIt(Sub.class, "${simple.strValue}", xxx -> new Sub()));
-
- // invalid component
- assertNull(tryIt(Sub.class, "${unknown}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InSuper() {
- // field in the superclass
- assertEquals(INT_VALUE, tryIt(Super.class, "${intValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InInterface() {
- // defined in the interface
- assertEquals(VALUE, map.extract(new Sub()));
- }
-
- @Test
- public void testNullExtractorExtract() {
- // empty properties - should only create NullExtractor
- map = new ClassExtractors(new Properties(), PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertNull("x=" + x, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testComponetizedExtractor() {
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // three components
- assertEquals(VALUE, tryIt(Sub.class, "${cont.data.strValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Method() {
- assertEquals(INT_VALUE, tryIt(Simple.class, "${intValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Field() {
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Map() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Unknown() {
- assertNull(tryIt(Simple.class, "${unknown2}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorExtract_MiddleNull() {
- // data component is null
- assertEquals(null, tryIt(Sub.class, "${cont.data.strValue}", xxx -> {
- Sub obj = new Sub();
- obj.cont.simpleValue = null;
- return obj;
- }));
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor_VoidMethod() {
- // tell it to use getVoidValue()
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${voidValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
- assertNull(map.extract(obj));
-
- assertFalse(obj.voidInvoked);
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor() {
- assertEquals(INT_VALUE, map.extract(new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetFieldExtractor() {
- // use a field
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor_MapSubclass() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- MapSubclass outer = new MapSubclass();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- /**
- * Sets a property for the given class, makes an object, and then returns
- * the value extracted.
- *
- * @param clazz class whose property is to be set
- * @param propval value to which to set the property
- * @param makeObj function to create the object whose data is to be
- * extracted
- * @return the extracted data, or {@code null} if nothing was extracted
- */
- private Object tryIt(Class<?> clazz, String propval, Function<Void, Object> makeObj) {
- Properties props = new Properties();
- props.setProperty(PROP_PREFIX + clazz.getName(), propval);
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- return map.extract(makeObj.apply(null));
- }
-
- /**
- * A Map subclass, used to verify that getMapExtractor() still handles it.
- */
- private static class MapSubclass extends TreeMap<String, Object> {
- private static final long serialVersionUID = 1L;
-
- }
-
- /**
- * A simple class.
- */
- private static class Simple {
-
- /**
- * This will not be used because getIntValue() will override it.
- */
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE2;
-
- /**
- * Used to verify retrieval via a field name.
- */
- @SuppressWarnings("unused")
- public final String strValue = VALUE;
-
- /**
- * Used to verify retrieval within maps.
- */
- @SuppressWarnings("unused")
- public Map<String, Object> mapValue = null;
-
- /**
- * {@code True} if {@link #getVoidValue()} was invoked, {@code false}
- * otherwise.
- */
- private boolean voidInvoked = false;
-
- /**
- * This function will supercede the value in the "intValue" field.
- *
- * @return INT_VALUE
- */
- @SuppressWarnings("unused")
- public Integer getIntValue() {
- return INT_VALUE;
- }
-
- /**
- * Used to verify that void functions are not invoked.
- */
- @SuppressWarnings("unused")
- public void getVoidValue() {
- voidInvoked = true;
- }
- }
-
- /**
- * Used to verify multi-component retrieval.
- */
- private static class Container {
- public Simple simpleValue = new Simple();
-
- @SuppressWarnings("unused")
- public Simple getData() {
- return simpleValue;
- }
- }
-
- /**
- * Used to verify extraction when the property refers to an interface.
- */
- private static interface WithString {
-
- String getStrValue();
- }
-
- /**
- * Used to verify retrieval within a superclass.
- */
- private static class Super implements WithString {
-
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE;
-
- @Override
- public String getStrValue() {
- return VALUE;
- }
- }
-
- /**
- * Used to verify retrieval within a subclass.
- */
- private static class Sub extends Super {
-
- @SuppressWarnings("unused")
- public final Simple simple = new Simple();
-
- /**
- * Used to verify multi-component retrieval.
- */
- public final Container cont = new Container();
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
deleted file mode 100644
index df42fe0f..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport {
-
- private ClassExtractorsTestSupport2 nested = new ClassExtractorsTestSupport2();
-
- /**
- * Constructor.
- */
- public ClassExtractorsTestSupport() {
- super();
- }
-
- public ClassExtractorsTestSupport2 getNested() {
- return nested;
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
deleted file mode 100644
index dddd2510..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport2 {
-
- public static final int NESTED_VALUE = 30;
-
- public final int theValue = NESTED_VALUE;
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
deleted file mode 100644
index aef0a925..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.onap.policy.common.utils.test.ExceptionsTester;
-
-public class ExtractorExceptionTest extends ExceptionsTester {
-
- @Test
- public void test() {
- assertEquals(5, test(ExtractorException.class));
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
deleted file mode 100644
index 7536d007..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Field field;
- private FieldExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- field = MyClass.class.getDeclaredField("value");
- ext = new FieldExtractor(field);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- obj.value = null;
- assertEquals(null, ext.extract(obj));
-
- obj.value = VALUE + "X";
- assertEquals(VALUE + "X", ext.extract(obj));
-
- // different value type
- field = MyClass.class.getDeclaredField("value2");
- ext = new FieldExtractor(field);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- private static class MyClass {
- @SuppressWarnings("unused")
- public String value = VALUE;
-
- @SuppressWarnings("unused")
- public int value2 = INT_VALUE;
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
deleted file mode 100644
index 74694579..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MapExtractorTest {
- private static final String KEY = "a.key";
- private static final String VALUE = "a.value";
-
- private MapExtractor ext;
-
- @Before
- public void setUp() {
- ext = new MapExtractor(KEY);
- }
-
- @Test
- public void testExtract_NotAMap() {
-
- // object is not a map (i.e., it's a String)
- assertNull(ext.extract(KEY));
- }
-
- @Test
- public void testExtract_MissingValue() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
-
- // object is a map, but doesn't have the key
- assertNull(ext.extract(map));
- }
-
- @Test
- public void testExtract() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
- map.put(KEY, VALUE);
-
- // object is a map and contains the key
- assertEquals(VALUE, ext.extract(map));
-
- // change to value to a different type
- map.put(KEY, 20);
- assertEquals(20, ext.extract(map));
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
deleted file mode 100644
index 41f731fb..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Method;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MethodExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Method meth;
- private MethodExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- meth = MyClass.class.getMethod("getValue");
- ext = new MethodExtractor(meth);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- meth = MyClass.class.getMethod("getNullValue");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(obj));
-
- // different value type
- meth = MyClass.class.getMethod("getIntValue");
- ext = new MethodExtractor(meth);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- @Test
- public void testExtract_InvokeEx() throws Exception {
- // invoke method that throws an exception
- meth = MyClass.class.getMethod("throwException");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(new MyClass()));
- }
-
- private static class MyClass {
-
- @SuppressWarnings("unused")
- public String getValue() {
- return VALUE;
- }
-
- @SuppressWarnings("unused")
- public int getIntValue() {
- return INT_VALUE;
- }
-
- @SuppressWarnings("unused")
- public String getNullValue() {
- return null;
- }
-
- @SuppressWarnings("unused")
- public String throwException() {
- throw new IllegalStateException("expected");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
deleted file mode 100644
index 99df69ec..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-
-public class ForwardTest extends SupportBasicMessageTester<Forward> {
- // values set by makeValidMessage()
- public static final CommInfrastructure VALID_PROTOCOL = CommInfrastructure.UEB;
- public static final int VALID_HOPS = 0;
- public static final String VALID_TOPIC = "topicA";
- public static final String VALID_PAYLOAD = "payloadA";
- public static final String VALID_REQUEST_ID = "requestIdA";
-
- /**
- * Time, in milliseconds, after which the most recent message was created.
- */
- private static long tcreateMs;
-
- public ForwardTest() {
- super(Forward.class);
- }
-
- @Test
- public void testBumpNumHops() {
- Forward msg = makeValidMessage();
-
- for (int x = 0; x < 3; ++x) {
- assertEquals("x=" + x, x, msg.getNumHops());
- msg.bumpNumHops();
- }
- }
-
- @Test
- public void testGetNumHops_testSetNumHops() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_HOPS, msg.getNumHops());
-
- msg.setNumHops(5);
- assertEquals(5, msg.getNumHops());
-
- msg.setNumHops(7);
- assertEquals(7, msg.getNumHops());
- }
-
- @Test
- public void testGetCreateTimeMs_testSetCreateTimeMs() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
-
- msg.setCreateTimeMs(1000L);
- assertEquals(1000L, msg.getCreateTimeMs());
-
- msg.setCreateTimeMs(2000L);
- assertEquals(2000L, msg.getCreateTimeMs());
- }
-
- @Test
- public void testGetProtocol_testSetProtocol() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.DMAAP);
- assertEquals(CommInfrastructure.DMAAP, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.UEB);
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
- }
-
- @Test
- public void testGetTopic_testSetTopic() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_TOPIC, msg.getTopic());
-
- msg.setTopic("topicX");
- assertEquals("topicX", msg.getTopic());
-
- msg.setTopic("topicY");
- assertEquals("topicY", msg.getTopic());
- }
-
- @Test
- public void testGetPayload_testSetPayload() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_PAYLOAD, msg.getPayload());
-
- msg.setPayload("payloadX");
- assertEquals("payloadX", msg.getPayload());
-
- msg.setPayload("payloadY");
- assertEquals("payloadY", msg.getPayload());
- }
-
- @Test
- public void testGetRequestId_testSetRequestId() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
-
- msg.setRequestId("reqX");
- assertEquals("reqX", msg.getRequestId());
-
- msg.setRequestId("reqY");
- assertEquals("reqY", msg.getRequestId());
- }
-
- @Test
- public void testIsExpired() {
- Forward msg = makeValidMessage();
-
- long tcreate = msg.getCreateTimeMs();
- assertTrue(msg.isExpired(tcreate + 1));
- assertTrue(msg.isExpired(tcreate + 10));
-
- assertFalse(msg.isExpired(tcreate));
- assertFalse(msg.isExpired(tcreate - 1));
- assertFalse(msg.isExpired(tcreate - 10));
- }
-
- @Test
- public void testCheckValidity_InvalidFields() throws Exception {
- // null source (i.e., superclass field)
- expectCheckValidityFailure(msg -> msg.setSource(null));
-
- // null protocol
- expectCheckValidityFailure(msg -> msg.setProtocol(null));
-
- // null or empty topic
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setTopic(value));
-
- // null payload
- expectCheckValidityFailure(msg -> msg.setPayload(null));
-
- // empty payload should NOT throw an exception
- Forward forward = makeValidMessage();
- forward.setPayload("");
- forward.checkValidity();
-
- // null or empty requestId
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setRequestId(value));
-
- // invalid hop count
- expectCheckValidityFailure(msg -> msg.setNumHops(-1));
- }
-
- @Override
- public Forward makeValidMessage() {
- tcreateMs = System.currentTimeMillis();
-
- Forward msg = new Forward(VALID_HOST, VALID_PROTOCOL, VALID_TOPIC, VALID_PAYLOAD, VALID_REQUEST_ID);
- msg.setChannel(VALID_CHANNEL);
-
- return msg;
- }
-
- @Override
- public void testDefaultConstructorFields(Forward msg) {
- super.testDefaultConstructorFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertEquals(0, msg.getCreateTimeMs());
- assertNull(msg.getPayload());
- assertNull(msg.getProtocol());
- assertNull(msg.getRequestId());
- assertNull(msg.getTopic());
- }
-
- @Override
- public void testValidFields(Forward msg) {
- super.testValidFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
- assertEquals(VALID_PAYLOAD, msg.getPayload());
- assertEquals(VALID_PROTOCOL, msg.getProtocol());
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
- assertEquals(VALID_TOPIC, msg.getTopic());
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index d8ed62c8..771f694e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -36,7 +36,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
@@ -44,7 +43,6 @@ import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -76,17 +74,6 @@ public class ActiveStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessHeartbeat_NullHost() {
assertNull(state.process(new Heartbeat()));
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
deleted file mode 100644
index f4eb870e..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
-import org.junit.Test;
-
-public class FilterUtilsTest {
-
- @Test
- public void testMakeEquals() {
- checkEquals("abc", "def", makeEquals("abc", "def"));
- }
-
- @Test
- public void testMakeAnd() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
-
- checkArray(CLASS_AND, 3, filter);
- checkEquals("an1", "av1", getItem(filter, 0));
- checkEquals("an2", "av2", getItem(filter, 1));
- checkEquals("an3", "av3", getItem(filter, 2));
- }
-
- @Test
- public void testMakeOr() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
-
- checkArray(CLASS_OR, 3, filter);
- checkEquals("on1", "ov1", getItem(filter, 0));
- checkEquals("on2", "ov2", getItem(filter, 1));
- checkEquals("on3", "ov3", getItem(filter, 2));
- }
-
- /**
- * Checks that the filter contains an array.
- *
- * @param expectedClassName type of filter this should represent
- * @param expectedCount number of items expected in the array
- * @param filter filter to be examined
- */
- protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
- assertEquals(expectedClassName, filter.get(JSON_CLASS));
-
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
- assertEquals(expectedCount, val.length);
- }
-
- /**
- * Checks that a map represents an "equals".
- *
- * @param name name of the field on the left side of the equals
- * @param value value on the right side of the equals
- * @param map map whose content is to be examined
- */
- protected void checkEquals(String name, String value, Map<String, Object> map) {
- assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
- assertEquals(name, map.get(JSON_FIELD));
- assertEquals(value, map.get(JSON_VALUE));
- }
-
- /**
- * Gets a particular sub-filter from the array contained within a filter.
- *
- * @param filter containing filter
- * @param index index of the sub-filter of interest
- * @return the sub-filter with the given index
- */
- @SuppressWarnings("unchecked")
- protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
-
- return (Map<String, Object>) val[index];
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
index 9e3ddcf9..5cc88d3a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,21 +21,18 @@
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -54,26 +51,6 @@ public class IdleStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
verifyNothingPublished();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
index ab468a1c..f8f70461 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,14 +29,12 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
public class InactiveStateTest extends SupportBasicStateTester {
@@ -56,17 +54,6 @@ public class InactiveStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessLeader() {
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
index 7dc7b2fd..e7c9db72 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -59,17 +58,6 @@ public class ProcessingStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessQuery() {
State next = mock(State.class);
when(mgr.goQuery()).thenReturn(next);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index aa999b5d..70abb96a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,14 +32,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
public class QueryStateTest extends SupportBasicStateTester {
@@ -58,17 +56,6 @@ public class QueryStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testGoQuery() {
assertNull(state.goQuery());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 142fbf7e..3d64687f 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -31,16 +31,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -60,24 +57,6 @@ public class StartStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-
- // get the sub-filter
- filter = utils.getItem(filter, 1);
-
- utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
- utils.getItem(filter, 1));
- }
-
- @Test
public void testStart() {
state.start();
@@ -142,15 +121,6 @@ public class StartStateTest extends SupportBasicStateTester {
}
@Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
Heartbeat msg = new Heartbeat();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index 5284ed11..87868a76 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -31,17 +31,14 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -125,17 +122,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testStart() {
assertThatCode(() -> state.start()).doesNotThrowAnyException();
}
@@ -213,19 +199,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testProcessForward() {
- Forward msg = new Forward();
- assertNull(state.process(msg));
-
- verify(mgr, never()).handle(msg);
-
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat()));
}
@@ -341,16 +314,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testPublishStringForward() {
- String chnl = "channelF";
- Forward msg = new Forward();
-
- state.publish(chnl, msg);
-
- verify(mgr).publish(chnl, msg);
- }
-
- @Test
public void testPublishStringHeartbeat() {
String chnl = "channelH";
Heartbeat msg = new Heartbeat();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
index 1a65c802..18d77ba7 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
@@ -143,12 +142,7 @@ public class SupportBasicStateTester {
when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
- prevState = new State(mgr) {
- @Override
- public Map<String, Object> getFilter() {
- throw new UnsupportedOperationException("cannot filter");
- }
- };
+ prevState = new State(mgr) {};
// capture publish() arguments
doAnswer(invocation -> {