summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java33
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java54
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java10
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java221
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java458
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java36
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java58
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java61
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java22
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java179
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java49
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java42
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java88
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java32
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java348
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java33
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java445
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java40
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java31
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java35
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java78
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java74
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java100
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java214
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java13
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java110
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java31
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java15
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java12
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java15
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java30
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java37
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java8
39 files changed, 172 insertions, 3135 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
index 2786a613..59c4b472 100644
--- a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-pooling-dmaap
# ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,34 +65,23 @@
# Topic used for inter-host communication for a particular controller
# pooling.<controller-name>.topic=XXX
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
# Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties. However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties. However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
# the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 9ba844ed..08c82fea 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
@@ -46,7 +45,7 @@ public class DmaapManager {
/**
* Topic source whose filter is to be manipulated.
*/
- private final FilterableTopicSource topicSource;
+ private final TopicSource topicSource;
/**
* Where to publish messages.
@@ -79,9 +78,6 @@ public class DmaapManager {
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
- // verify that we can set the filter
- setFilter(null);
-
} catch (IllegalArgumentException e) {
logger.error("failed to attach to topic {}", topic);
throw new PoolingFeatureException(e);
@@ -98,15 +94,10 @@ public class DmaapManager {
* @return the topic source
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
- private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ private TopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : getTopicSources()) {
if (topic.equals(src.getTopic())) {
- if (src instanceof FilterableTopicSource) {
- return (FilterableTopicSource) src;
-
- } else {
- throw new PoolingFeatureException("topic source " + topic + " is not filterable");
- }
+ return src;
}
}
@@ -199,22 +190,6 @@ public class DmaapManager {
}
/**
- * Sets the server-side filter to be used by the consumer.
- *
- * @param filter the filter string, or {@code null} if no filter is to be used
- * @throws PoolingFeatureException if the topic is not filterable
- */
- public void setFilter(String filter) throws PoolingFeatureException {
- try {
- logger.debug("change filter for topic {} to {}", topic, filter);
- topicSource.setFilter(filter);
-
- } catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic, e);
- }
- }
-
- /**
* Publishes a message to the sink.
*
* @param msg message to be published
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 00b7a215..c7574925 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -77,11 +77,11 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
- * later. As multiple threads can be active within the methods at the same time, we must keep
- * this in thread local storage.
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
- private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
/**
* Constructor.
@@ -236,19 +236,19 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- if (mgr.beforeOffer(protocol, topic2, event)) {
+ if (mgr.beforeOffer(topic2, event)) {
return true;
}
- offerArgs.set(new OfferArgs(protocol, topic2, event));
+ offerTopics.set(topic2);
return false;
}
@Override
public boolean beforeInsert(DroolsController droolsController, Object fact) {
- OfferArgs args = offerArgs.get();
- if (args == null) {
+ String topic = offerTopics.get();
+ if (topic == null) {
logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
@@ -279,7 +279,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ return mgr.beforeInsert(topic, fact);
}
@Override
@@ -287,7 +287,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean success) {
// clear any stored arguments
- offerArgs.remove();
+ offerTopics.remove();
return false;
}
@@ -345,40 +345,6 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
- /**
- * Arguments captured from beforeOffer().
- */
- private static class OfferArgs {
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event text that was received on the topic.
- */
- private String event;
-
- /**
- * Constructor.
- *
- * @param protocol protocol
- * @param topic topic
- * @param event the actual event data received on the topic
- */
- public OfferArgs(CommInfrastructure protocol, String topic, String event) {
- this.protocol = protocol;
- this.topic = topic;
- this.event = event;
- }
- }
-
/*
* The remaining methods may be overridden by junit tests.
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index e3576b8f..cc8e3a4d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
@@ -84,13 +83,6 @@ public interface PoolingManager {
void publish(String channel, Message msg);
/**
- * Handles a {@link Forward} event that was received from the internal topic.
- *
- * @param event event
- */
- void handle(Forward event);
-
- /**
* Schedules a timer to fire after a delay.
*
* @param delayMs delay, in milliseconds
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index b2966101..9093c7c0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling;
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
- /**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
private final CountDownLatch activeLatch;
@@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final DmaapManager dmaapMgr;
/**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
- /**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
@@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
- /**
* Constructs the manager, initializing all of the data structures.
*
* @param host name/uuid of this host
@@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*/
@@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
@@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
@@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
/**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
- /**
* Decodes an event from a String into an event Object.
*
* @param topic2 topic
@@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
- /**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
* decoded.
@@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index 80eee501..4627b9eb 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import java.util.HashMap;
import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
@@ -60,7 +59,6 @@ public class Serializer {
private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
static {
- class2type.put(Forward.class, "forward");
class2type.put(Heartbeat.class, "heartbeat");
class2type.put(Identification.class, "identification");
class2type.put(Leader.class, "leader");
@@ -79,7 +77,7 @@ public class Serializer {
/**
* Encodes a filter.
- *
+ *
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
*/
@@ -89,7 +87,7 @@ public class Serializer {
/**
* Encodes a message.
- *
+ *
* @param msg message to be encoded
* @return the message, serialized as a JSON string
*/
@@ -108,7 +106,7 @@ public class Serializer {
/**
* Decodes a JSON string into a Message.
- *
+ *
* @param msg JSON string representing the message
* @return the message
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
deleted file mode 100644
index bd75995f..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
- private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
- /**
- * Properties that specify how the data is to be extracted from a given
- * class.
- */
- private final Properties properties;
-
- /**
- * Property prefix, including a trailing ".".
- */
- private final String prefix;
-
- /**
- * Type of item to be extracted.
- */
- private final String type;
-
- /**
- * Maps the class name to its extractor.
- */
- private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param props properties that specify how the data is to be extracted from
- * a given class
- * @param prefix property name prefix, prepended before the class name
- * @param type type of item to be extracted
- */
- public ClassExtractors(Properties props, String prefix, String type) {
- this.properties = props;
- this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
- this.type = type;
- }
-
- /**
- * Gets the number of extractors in the map.
- *
- * @return gets the number of extractors in the map
- */
- protected int size() {
- return class2extractor.size();
- }
-
- /**
- * Extracts the desired data item from an object.
- *
- * @param object object from which to extract the data item
- * @return the extracted item, or {@code null} if it could not be extracted
- */
- public Object extract(Object object) {
- if (object == null) {
- return null;
- }
-
- Extractor ext = getExtractor(object);
-
- return ext.extract(object);
- }
-
- /**
- * Gets the extractor for the given type of object, creating one if it
- * doesn't exist yet.
- *
- * @param object object whose extracted is desired
- * @return an extractor for the object
- */
- private Extractor getExtractor(Object object) {
- Class<?> clazz = object.getClass();
- Extractor ext = class2extractor.get(clazz.getName());
-
- if (ext == null) {
- // allocate a new extractor, if another thread doesn't beat us to it
- ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
-
- return ext;
- }
-
- /**
- * Builds an extractor for the class.
- *
- * @param clazz class for which the extractor should be built
- *
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz) {
- String value = properties.getProperty(prefix + clazz.getName(), null);
- if (value != null) {
- // property has config info for this class - build the extractor
- return buildExtractor(clazz, value);
- }
-
- /*
- * Get the extractor, if any, for the super class or interfaces, but
- * don't add one if it doesn't exist
- */
- Extractor ext = getClassExtractor(clazz, false);
- if (ext != null) {
- return ext;
- }
-
- /*
- * No extractor defined for for this class or its super class - we
- * cannot extract data items from objects of this type, so just
- * allocated a null extractor.
- */
- logger.warn("missing property {}{}", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- /**
- * Builds an extractor for the class, based on the config value extracted
- * from the corresponding property.
- *
- * @param clazz class for which the extractor should be built
- * @param value config value (e.g., "${event.request.id}"
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz, String value) {
- if (!value.startsWith("${")) {
- logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
- return new NullExtractor();
- }
-
- if (!value.endsWith("}")) {
- logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // get the part in the middle
- String val = value.substring(2, value.length() - 1);
- if (val.startsWith(".")) {
- logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- if (val.endsWith(".")) {
- logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // everything's valid - create the extractor
- try {
- ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
- /*
- * If there's only one extractor, then just return it, otherwise
- * return the whole extractor.
- */
- return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
- } catch (ExtractorException e) {
- logger.warn("cannot build extractor for {}", clazz.getName(), e);
- return new NullExtractor();
- }
- }
-
- /**
- * Gets the extractor for a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose extractor is desired
- * @param addOk {@code true} if the extractor may be added, provided the
- * property is defined, {@code false} otherwise
- * @return the extractor to be used for the class, or {@code null} if no
- * extractor has been defined yet
- */
- private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
- if (clazz == null) {
- return null;
- }
-
- Extractor ext = null;
-
- if (addOk) {
- String val = properties.getProperty(prefix + clazz.getName(), null);
-
- if (val != null) {
- /*
- * A property is defined for this class, so create the extractor
- * for it.
- */
- return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
- }
-
- // see if the superclass has an extractor
- if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
- return ext;
- }
-
- // check the interfaces, too
- for (Class<?> clz : clazz.getInterfaces()) {
- if ((ext = getClassExtractor(clz, true)) != null) {
- break;
- }
- }
-
- return ext;
- }
-
- /**
- * Extractor that always returns {@code null}. Used when no extractor could
- * be built for a given object type.
- */
- private class NullExtractor implements Extractor {
-
- @Override
- public Object extract(Object object) {
- logger.info("cannot extract {} from {}", type, object.getClass());
- return null;
- }
- }
-
- /**
- * Component-ized extractor. Extracts an object that is referenced
- * hierarchically, where each name identifies a particular component within
- * the hierarchy. Supports retrieval from {@link Map} objects, as well as
- * via getXxx() methods, or by direct field retrieval.
- *
- * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
- */
- private class ComponetizedExtractor implements Extractor {
-
- /**
- * Extractor for each component.
- */
- private final Extractor[] extractors;
-
- /**
- * Constructor.
- *
- * @param clazz the class associated with the object at the root of the
- * hierarchy
- * @param names name associated with each component
- * @throws ExtractorException extractor exception
- */
- public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
- this.extractors = new Extractor[names.length];
-
- Class<?> clz = clazz;
-
- for (int x = 0; x < names.length; ++x) {
- String comp = names[x];
-
- Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
- extractors[x] = pair.getLeft();
- clz = pair.getRight();
- }
- }
-
- /**
- * Builds an extractor for the given component of an object.
- *
- * @param clazz type of object from which the component will be
- * extracted
- * @param comp name of the component to extract
- * @return a pair containing the extractor and the extracted object's
- * type
- * @throws ExtractorException extrator exception
- */
- private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
- Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
- if (pair == null) {
- pair = getFieldExtractor(clazz, comp);
- }
-
- if (pair == null) {
- pair = getMapExtractor(clazz, comp);
- }
-
-
- // didn't find an extractor
- if (pair == null) {
- throw new ExtractorException("class " + clazz + " contains no element " + comp);
- }
-
- return pair;
- }
-
- @Override
- public Object extract(Object object) {
- Object obj = object;
-
- for (Extractor ext : extractors) {
- if (obj == null) {
- break;
- }
-
- obj = ext.extract(obj);
- }
-
- return obj;
- }
-
- /**
- * Gets an extractor that invokes a getXxx() method to retrieve the
- * object.
- *
- * @param clazz container's class
- * @param name name of the property to be retrieved
- * @return a new extractor, or {@code null} if the class does not
- * contain the corresponding getXxx() method
- * @throws ExtractorException if the getXxx() method is inaccessible
- */
- private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
- Method meth;
-
- String nm = "get" + StringUtils.capitalize(name);
-
- try {
- meth = clazz.getMethod(nm);
-
- Class<?> retType = meth.getReturnType();
- if (retType == void.class) {
- // it's a void method, thus it won't return an object
- return null;
- }
-
- return Pair.of(new MethodExtractor(meth), retType);
-
- } catch (NoSuchMethodException expected) {
- // no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
- }
- }
-
- /**
- * Gets an extractor for a field within the object.
- *
- * @param clazz container's class
- * @param name name of the field whose value is to be extracted
- * @return a new extractor, or {@code null} if the class does not
- * contain the given field
- * @throws ExtractorException if the field is inaccessible
- */
- private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
- Field field = getClassField(clazz, name);
- if (field == null) {
- return null;
- }
-
- return Pair.of(new FieldExtractor(field), field.getType());
- }
-
- /**
- * Gets an extractor for an item within a Map object.
- *
- * @param clazz container's class
- * @param key item key within the map
- * @return a new extractor, or {@code null} if the class is not a Map
- * subclass
- */
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
- if (!Map.class.isAssignableFrom(clazz)) {
- return null;
- }
-
- /*
- * Don't know the value's actual type, so we'll assume it's a Map
- * for now. Things should still work OK, as this is only used to
- * direct the constructor on what type of extractor to create next.
- * If the object turns out not to be a map, then the MapExtractor
- * for the next component will just return null.
- */
- return Pair.of(new MapExtractor(key), Map.class);
- }
-
- /**
- * Gets field within a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose field is desired
- * @param name name of the desired field
- * @return the field within the class, or {@code null} if the field does
- * not exist
- * @throws ExtractorException if the field is inaccessible
- */
- private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
- if (clazz == null) {
- return null;
- }
-
- try {
- return clazz.getField(name);
-
- } catch (NoSuchFieldException expected) {
- // no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
- }
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
deleted file mode 100644
index 5e02d602..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
- /**
- * Extracts an object contained within another object.
- *
- * @param object object from which to extract the contained object
- * @return the extracted value, or {@code null} if it cannot be extracted
- */
- Object extract(Object object);
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
deleted file mode 100644
index a864672b..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public ExtractorException() {
- super();
- }
-
- public ExtractorException(String message) {
- super(message);
- }
-
- public ExtractorException(Throwable cause) {
- super(cause);
- }
-
- public ExtractorException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
deleted file mode 100644
index 9389ab22..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
- /**
- * Field containing the object.
- */
- private final Field field;
-
- /**
- * Constructor.
- *
- * @param field field containing the object
- */
- public FieldExtractor(Field field) {
- this.field = field;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return field.get(object);
-
- } catch (IllegalAccessException | IllegalArgumentException e) {
- logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
deleted file mode 100644
index 9c5be5ff..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
- /**
- * Key to the item to extract from the map.
- */
- private final String key;
-
- /**
- * Constructor.
- *
- * @param key key to the item to extract from the map
- */
- public MapExtractor(String key) {
- this.key = key;
- }
-
- @Override
- public Object extract(Object object) {
-
- if (object instanceof Map) {
- Map<?, ?> map = (Map<?, ?>) object;
-
- return map.get(key);
-
- } else {
- logger.warn("expecting a map, instead of {}", object.getClass());
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
deleted file mode 100644
index 3efef5ec..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
- /**
- * Method to invoke to extract the contained object.
- */
- private final Method method;
-
- /**
- * Constructor.
- *
- * @param method method to invoke to extract the contained object
- */
- public MethodExtractor(Method method) {
- this.method = method;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return method.invoke(object);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index 6be080f7..c78fb17b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,7 +63,7 @@ public class BucketAssignments {
/**
* Constructor.
- *
+ *
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
*/
@@ -81,7 +81,7 @@ public class BucketAssignments {
/**
* Gets the leader, which is the host with the minimum UUID.
- *
+ *
* @return the assignment leader
*/
public String getLeader() {
@@ -103,7 +103,7 @@ public class BucketAssignments {
/**
* Determines if a host has an assignment.
- *
+ *
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
@@ -123,7 +123,7 @@ public class BucketAssignments {
/**
* Gets all of the hosts that have an assignment.
- *
+ *
* @return all of the hosts that have an assignment
*/
public Set<String> getAllHosts() {
@@ -143,7 +143,7 @@ public class BucketAssignments {
/**
* Gets the host assigned to a given bucket.
- *
+ *
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
@@ -153,12 +153,12 @@ public class BucketAssignments {
return null;
}
- return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
* Gets the number of buckets.
- *
+ *
* @return the number of buckets
*/
public int size() {
@@ -168,7 +168,7 @@ public class BucketAssignments {
/**
* Checks the validity of the assignments, verifying that all buckets have been
* assigned to a host.
- *
+ *
* @throws PoolingFeatureException if the assignments are invalid
*/
public void checkValidity() throws PoolingFeatureException {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
deleted file mode 100644
index 3c34e971..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
- /**
- * Number of hops (i.e., number of times it's been forwarded) so far.
- */
- private int numHops;
-
- /**
- * Time, in milliseconds, at which the message was created.
- */
- private long createTimeMs;
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event pay load that was received on the topic.
- */
- private String payload;
-
- /**
- * The request id that was extracted from the event.
- */
- private String requestId;
-
- /**
- * Constructor.
- */
- public Forward() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param source host on which the message originated
- * @param protocol protocol
- * @param topic topic
- * @param payload the actual event data received on the topic
- * @param requestId request id
- */
- public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
- super(source);
-
- this.numHops = 0;
- this.createTimeMs = System.currentTimeMillis();
- this.protocol = protocol;
- this.topic = topic;
- this.payload = payload;
- this.requestId = requestId;
- }
-
- /**
- * Increments {@link #numHops}.
- */
- public void bumpNumHops() {
- ++numHops;
- }
-
- public int getNumHops() {
- return numHops;
- }
-
- public void setNumHops(int numHops) {
- this.numHops = numHops;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public void setCreateTimeMs(long createTimeMs) {
- this.createTimeMs = createTimeMs;
- }
-
- public CommInfrastructure getProtocol() {
- return protocol;
- }
-
- public void setProtocol(CommInfrastructure protocol) {
- this.protocol = protocol;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public boolean isExpired(long minCreateTimeMs) {
- return (createTimeMs < minCreateTimeMs);
-
- }
-
- @Override
- public void checkValidity() throws PoolingFeatureException {
-
- super.checkValidity();
-
- if (protocol == null) {
- throw new PoolingFeatureException("missing message protocol");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new PoolingFeatureException("missing message topic");
- }
-
- /*
- * Note: an empty pay load is OK, as an empty message could have been
- * received on the topic.
- */
-
- if (payload == null) {
- throw new PoolingFeatureException("missing message payload");
- }
-
- if (requestId == null || requestId.isEmpty()) {
- throw new PoolingFeatureException("missing message requestId");
- }
-
- if (numHops < 0) {
- throw new PoolingFeatureException("invalid message hop count");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
deleted file mode 100644
index 5b7012d2..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
- // message element names
- public static final String MSG_CHANNEL = "channel";
- public static final String MSG_TIMESTAMP = "timestampMs";
-
- // json element names
- protected static final String JSON_CLASS = "class";
- protected static final String JSON_FILTERS = "filters";
- protected static final String JSON_FIELD = "field";
- protected static final String JSON_VALUE = "value";
-
- // values to be stuck into the "class" element
- protected static final String CLASS_OR = "Or";
- protected static final String CLASS_AND = "And";
- protected static final String CLASS_EQUALS = "Equals";
-
- /**
- * Constructor.
- */
- private FilterUtils() {
- super();
- }
-
- /**
- * Makes a filter that verifies that a field equals a value.
- *
- * @param field name of the field to check
- * @param value desired value
- * @return a map representing an "equals" filter
- */
- public static Map<String, Object> makeEquals(String field, String value) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_EQUALS);
- map.put(JSON_FIELD, field);
- map.put(JSON_VALUE, value);
-
- return map;
- }
-
- /**
- * Makes an "and" filter, where all of the items must be true.
- *
- * @param items items to be checked
- * @return an "and" filter
- */
- public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_AND);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-
- /**
- * Makes an "or" filter, where at least one of the items must be true.
- *
- * @param items items to be checked
- * @return an "or" filter
- */
- public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_OR);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index 59d264ec..c5761bfd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,16 +20,8 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +41,7 @@ public class StartState extends State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
@@ -58,7 +50,7 @@ public class StartState extends State {
/**
* Get Heart beat time stamp in milliseconds.
- *
+ *
* @return the time stamp inserted into the heart beat message
*/
public long getHbTimestampMs() {
@@ -110,14 +102,4 @@ public class StartState extends State {
return null;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getFilter() {
- // ignore everything except our most recent heart beat message
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
- makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index f2277be3..853c24d4 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -20,22 +20,15 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
@@ -71,18 +64,6 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic. The
- * default method returns a filter that accepts messages on the admin channel and on
- * the host's own channel.
- *
- * @return the server-side filter to use.
- */
- @SuppressWarnings("unchecked")
- public Map<String, Object> getFilter() {
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
- }
-
- /**
* Cancels the timers added by this state.
*/
public final void cancelTimers() {
@@ -142,26 +123,6 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to handle and
- * returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Forward msg) {
- if (getHost().equals(msg.getChannel())) {
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
-
- } else {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
- getTopic());
- }
-
- return null;
- }
-
- /**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
@@ -293,16 +254,6 @@ public abstract class State {
* @param channel channel
* @param msg message to be published
*/
- protected final void publish(String channel, Forward msg) {
- mgr.publish(channel, msg);
- }
-
- /**
- * Publishes a message on the specified channel.
- *
- * @param channel channel
- * @param msg message to be published
- */
protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index 7f73a702..ec554fc9 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -48,10 +47,9 @@ public class DmaapManagerTest {
private static final String EXPECTED = "expected";
private static final String MY_TOPIC = "my.topic";
private static final String MSG = "a message";
- private static final String FILTER = "a filter";
private TopicListener listener;
- private FilterableTopicSource source;
+ private TopicSource source;
private boolean gotSources;
private TopicSink sink;
private boolean gotSinks;
@@ -65,7 +63,7 @@ public class DmaapManagerTest {
@Before
public void setUp() throws Exception {
listener = mock(TopicListener.class);
- source = mock(FilterableTopicSource.class);
+ source = mock(TopicSource.class);
gotSources = false;
sink = mock(TopicSink.class);
gotSinks = false;
@@ -104,35 +102,12 @@ public class DmaapManagerTest {
};
}
- @Test(expected = PoolingFeatureException.class)
- public void testDmaapManager_CannotFilter() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- new DmaapManagerImpl(MY_TOPIC);
- }
-
@Test
public void testGetTopic() {
assertEquals(MY_TOPIC, mgr.getTopic());
}
@Test(expected = PoolingFeatureException.class)
- public void testFindTopicSource_NotFilterableTopicSource() throws PoolingFeatureException {
-
- // matching topic, but doesn't have the correct interface
- TopicSource source2 = mock(TopicSource.class);
- when(source2.getTopic()).thenReturn(MY_TOPIC);
-
- new DmaapManagerImpl(MY_TOPIC) {
- @Override
- protected List<TopicSource> getTopicSources() {
- return Arrays.asList(source2);
- }
- };
- }
-
- @Test(expected = PoolingFeatureException.class)
public void testFindTopicSource_NotFound() throws PoolingFeatureException {
// one item in list, and its topic doesn't match
new DmaapManagerImpl(MY_TOPIC) {
@@ -275,19 +250,6 @@ public class DmaapManagerTest {
}
@Test
- public void testSetFilter() throws PoolingFeatureException {
- assertThatCode(() -> mgr.setFilter(FILTER)).doesNotThrowAnyException();
- }
-
- @Test(expected = PoolingFeatureException.class)
- public void testSetFilter_Exception() throws PoolingFeatureException {
- // force an error when setFilter() is called
- doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
- mgr.setFilter(FILTER);
- }
-
- @Test
public void testPublish() throws PoolingFeatureException {
// cannot publish before starting
assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index 96b358da..efab636b 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,20 +47,19 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
@@ -216,10 +215,6 @@ public class FeatureTest {
*/
private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
/**
- * Queue for the external "DMaaP" topic.
- */
- private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
- /**
* Counts the number of decode errors.
*/
private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
@@ -306,13 +301,16 @@ public class FeatureTest {
}
/**
- * Offers an event to the external topic.
+ * Offers an event to the external topic. As each host needs a copy, it is posted
+ * to each Host's queue.
*
* @param event event
*/
public void offerExternal(String event) {
- externalTopic.offer(event);
+ for (Host host : hosts) {
+ host.getExternalTopic().offer(event);
+ }
}
/**
@@ -337,20 +335,6 @@ public class FeatureTest {
}
/**
- * Offers amessage to an internal channel.
- *
- * @param channel channel
- * @param message message
- */
-
- public void offerInternal(String channel, String message) {
- BlockingQueue<String> queue = channel2queue.get(channel);
- if (queue != null) {
- queue.offer(message);
- }
- }
-
- /**
* Associates a controller with its drools controller.
*
* @param controller controller
@@ -374,16 +358,6 @@ public class FeatureTest {
}
/**
- * Constructor.
- *
- * @return queue for the external topic
- */
-
- public BlockingQueue<String> getExternalTopic() {
- return externalTopic;
- }
-
- /**
* Get decode errors.
*
* @return the number of decode errors so far
@@ -465,6 +439,12 @@ public class FeatureTest {
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
+ * Queue for the external "DMaaP" topic.
+ */
+ @Getter
+ private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
+ /**
* Source that reads from the external topic and posts to the listener.
*/
@@ -493,7 +473,7 @@ public class FeatureTest {
doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
context.addController(controller, drools);
// arrange to read from the external topic
- externalSource = new TopicSourceImpl(context, false);
+ externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
feature = new PoolingFeatureImpl(context);
}
@@ -666,10 +646,6 @@ public class FeatureTest {
private static class TopicSinkImpl extends TopicImpl implements TopicSink {
private final Context context;
- /**
- * Used to decode the messages so that the channel can be extracted.
- */
- private final Serializer serializer = new Serializer();
/**
* Constructor.
@@ -687,15 +663,7 @@ public class FeatureTest {
return false;
}
try {
- Message msg = serializer.decodeMsg(message);
- String channel = msg.getChannel();
- if (Message.ADMIN.equals(channel)) {
- // add to every queue
- context.offerInternal(message);
- } else {
- // add to a specific queue
- context.offerInternal(channel, message);
- }
+ context.offerInternal(message);
return true;
} catch (JsonParseException e) {
logger.warn("could not decode message: {}", message);
@@ -709,7 +677,7 @@ public class FeatureTest {
* Source implementation that reads from a queue associated with a topic.
*/
- private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+ private static class TopicSourceImpl extends TopicImpl implements TopicSource {
private final String topic;
/**
@@ -726,24 +694,13 @@ public class FeatureTest {
/**
* Constructor.
*
- * @param context context
- * @param internal {@code true} if to read from the internal topic, {@code false}
- * to read from the external topic
+ * @param type topic type
+ * @param queue topic from which to read
*/
- public TopicSourceImpl(Context context, boolean internal) {
- if (internal) {
- this.topic = INTERNAL_TOPIC;
- this.queue = context.getCurrentHost().getInternalQueue();
- } else {
- this.topic = EXTERNAL_TOPIC;
- this.queue = context.getExternalTopic();
- }
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("topic filter set to: {}", filter);
+ public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+ this.topic = type;
+ this.queue = queue;
}
@Override
@@ -1056,7 +1013,8 @@ public class FeatureTest {
@Override
protected List<TopicSource> getTopicSources() {
- return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+ return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+ currentContext.get().getCurrentHost().getInternalQueue()));
}
@Override
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 63bfc11e..02a4db5c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -298,20 +298,20 @@ public class PoolingFeatureTest {
@Test
public void testBeforeOffer() {
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure that the args were captured
pool.beforeInsert(drools1, OBJECT1);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
// ensure that the new args were captured
pool.beforeInsert(drools1, OBJECT2);
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
@@ -326,14 +326,14 @@ public class PoolingFeatureTest {
public void testBeforeOffer_MgrTrue() {
// manager will return true
- when(mgr1.beforeOffer(any(), any(), any())).thenReturn(true);
+ when(mgr1.beforeOffer(any(), any())).thenReturn(true);
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+ verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure it's still in the map by re-invoking
assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
- verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+ verify(mgr1).beforeOffer(TOPIC2, EVENT2);
assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
}
@@ -342,12 +342,12 @@ public class PoolingFeatureTest {
public void testBeforeInsert() {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+ verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(drools1, OBJECT2));
- verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+ verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
@@ -358,10 +358,10 @@ public class PoolingFeatureTest {
// call beforeInsert without beforeOffer
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -376,7 +376,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -391,7 +391,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -407,7 +407,7 @@ public class PoolingFeatureTest {
pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
}
@Test
@@ -426,7 +426,7 @@ public class PoolingFeatureTest {
assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
assertFalse(pool.beforeInsert(drools1, OBJECT1));
- verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+ verify(mgr1, never()).beforeInsert(any(), any());
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 2a0066b7..21bd62d1 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -25,10 +25,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -49,9 +47,7 @@ import org.mockito.ArgumentCaptor;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,7 +79,6 @@ public class PoolingManagerImplTest {
private static final String THE_EVENT = "the event";
private static final Object DECODED_EVENT = new Object();
- private static final String REQUEST_ID = "my.request.id";
/**
* Number of dmaap.publish() invocations that should be issued when the manager is
@@ -98,7 +93,6 @@ public class PoolingManagerImplTest {
private PoolingProperties poolProps;
private ListeningController controller;
- private ClassExtractors extractors;
private DmaapManager dmaap;
private boolean gotDmaap;
private ScheduledThreadPoolExecutor sched;
@@ -132,7 +126,6 @@ public class PoolingManagerImplTest {
ser = new Serializer();
active = new CountDownLatch(1);
- extractors = mock(ClassExtractors.class);
dmaap = mock(DmaapManager.class);
gotDmaap = false;
controller = mock(ListeningController.class);
@@ -140,8 +133,6 @@ public class PoolingManagerImplTest {
schedCount = 0;
drools = mock(DroolsController.class);
- when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
-
when(controller.getName()).thenReturn(MY_CONTROLLER);
when(controller.getDrools()).thenReturn(drools);
when(controller.isAlive()).thenReturn(true);
@@ -176,18 +167,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testPoolingManagerImpl_ClassEx() {
- /*
- * this controller does not implement TopicListener, which should cause a
- * ClassCastException
- */
- PolicyController ctlr = mock(PolicyController.class);
-
- assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, ctlr, poolProps, active))
- .isInstanceOf(PoolingFeatureRtException.class).hasCauseInstanceOf(ClassCastException.class);
- }
-
- @Test
public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
// throw an exception when we try to create the dmaap manager
PoolingFeatureException ex = new PoolingFeatureException();
@@ -284,23 +263,20 @@ public class PoolingManagerImplTest {
startMgr();
mgr.startDistributing(makeAssignments(false));
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ verify(dmaap, times(START_PUB)).publish(any());
mgr.beforeStop();
verify(dmaap).stopConsumer(mgr);
verify(sched).shutdownNow();
- verify(dmaap, times(START_PUB + 2)).publish(any());
+ verify(dmaap, times(START_PUB + 1)).publish(any());
verify(dmaap).publish(contains("offline"));
assertTrue(mgr.getCurrent() instanceof IdleState);
// verify that next message is handled locally
- mgr.handle(msg);
- verify(dmaap, times(START_PUB + 2)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+ verify(dmaap, times(START_PUB + 1)).publish(any());
}
@Test
@@ -407,19 +383,11 @@ public class PoolingManagerImplTest {
// start should invoke changeState()
startMgr();
- int ntimes = 0;
-
- // should have set the filter for the StartState
- verify(dmaap, times(++ntimes)).setFilter(any());
-
/*
* now go offline while it's locked
*/
lockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// should have cancelled the timers
assertEquals(2, futures.size());
verify(futures.poll()).cancel(false);
@@ -430,9 +398,6 @@ public class PoolingManagerImplTest {
*/
unlockMgr();
- // should have set the new filter
- verify(dmaap, times(++ntimes)).setFilter(any());
-
// new timers should now be active
assertEquals(2, futures.size());
verify(futures.poll(), never()).cancel(false);
@@ -440,26 +405,6 @@ public class PoolingManagerImplTest {
}
@Test
- public void testSetFilter() throws Exception {
- // start should cause a filter to be set
- startMgr();
-
- verify(dmaap).setFilter(any());
- }
-
- @Test
- public void testSetFilter_DmaapEx() throws Exception {
-
- // generate an exception
- doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
-
- // start should invoke setFilter()
- assertThatCode(() -> startMgr()).doesNotThrowAnyException();
-
- // no exception, means success
- }
-
- @Test
public void testSchedule() throws Exception {
// must start the scheduler
startMgr();
@@ -583,64 +528,35 @@ public class PoolingManagerImplTest {
}
@Test
- public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+ public void testBeforeOffer_Unlocked() throws Exception {
startMgr();
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testBeforeOffer_Locked_NoIntercept() throws Exception {
- startMgr();
-
- lockMgr();
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeOffer_Locked_Intercept() throws Exception {
+ public void testBeforeOffer_Locked() throws Exception {
startMgr();
lockMgr();
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(false);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+ // route the message to another host
+ mgr.startDistributing(makeAssignments(false));
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
- public void testBeforeInsert_Intercept() throws Exception {
+ public void testBeforeInsert() throws Exception {
startMgr();
lockMgr();
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testBeforeInsert_NoIntercept() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
@@ -657,17 +573,17 @@ public class PoolingManagerImplTest {
public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
startMgr();
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
}
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
- validateUnhandled(CommInfrastructure.UEB);
+ validateUnhandled();
}
@Test
@@ -678,7 +594,7 @@ public class PoolingManagerImplTest {
@Test
public void testHandleEvent_NullTarget() throws Exception {
// buckets have null targets
- validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
+ validateDiscarded(new BucketAssignments(new String[] {null, null}));
}
@Test
@@ -687,46 +603,9 @@ public class PoolingManagerImplTest {
}
@Test
- public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
- mgr.handle(msg);
-
- // shouldn't publish
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandleEvent_DiffHost_Forward() throws Exception {
- validateHandled(makeAssignments(false), START_PUB + 1);
- }
-
- @Test
- public void testExtractRequestId_NullEvent() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
- }
-
- @Test
- public void testExtractRequestId_NullReqId() throws Exception {
- validateHandleReqId(null);
- }
-
- @Test
- public void testExtractRequestId() throws Exception {
- startMgr();
-
+ public void testHandleEvent_DiffHost() throws Exception {
// route the message to the *OTHER* host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateDiscarded(makeAssignments(false));
}
@Test
@@ -746,7 +625,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -767,7 +646,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -787,7 +666,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -807,7 +686,7 @@ public class PoolingManagerImplTest {
// create assignments, though they are irrelevant
mgr.startDistributing(makeAssignments(false));
- assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+ assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -819,101 +698,7 @@ public class PoolingManagerImplTest {
// route to another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
- }
-
- @Test
- public void testMakeForward() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- }
-
- @Test
- public void testMakeForward_InvalidMsg() throws Exception {
- startMgr();
-
- // route the message to another host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- // should not have tried to publish a message
- verify(dmaap, times(START_PUB)).publish(any());
- }
-
- @Test
- public void testHandle_SameHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testHandle_DiffHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(false));
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
- verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
- }
-
- @Test
- public void testInject() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testInject_Ex() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- // generate RuntimeException when onTopicEvent() is invoked
- doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
-
- final CountDownLatch latch = catchRecursion(true);
-
- Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
- mgr.handle(msg);
-
- verify(dmaap, times(START_PUB)).publish(any());
- verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
- // ensure we made it past both beforeXxx() methods
- assertEquals(0, latch.getCount());
+ assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
}
@Test
@@ -971,20 +756,18 @@ public class PoolingManagerImplTest {
// null assignments should cause message to be processed locally
mgr.startDistributing(null);
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
- // route the message to this host
+ // message for this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
- // route the message to the other host
+ // message for another host
mgr.startDistributing(makeAssignments(false));
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@Test
@@ -1088,9 +871,7 @@ public class PoolingManagerImplTest {
private void validateHandleReqId(String requestId) throws PoolingFeatureException {
startMgr();
- when(extractors.extract(any())).thenReturn(requestId);
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
private void validateNoForward() throws PoolingFeatureException {
@@ -1099,67 +880,23 @@ public class PoolingManagerImplTest {
// route the message to this host
mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
verify(dmaap, times(START_PUB)).publish(any());
}
- private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+ private void validateUnhandled() throws PoolingFeatureException {
startMgr();
-
- // route the message to the *OTHER* host
- mgr.startDistributing(assignments);
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(publishCount)).publish(any());
+ assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
- private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+ private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
startMgr();
- assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
- }
-
- /**
- * Configure the mock controller to act like a real controller, invoking beforeOffer
- * and then beforeInsert, so we can make sure they pass through. We'll keep count to
- * ensure we don't get into infinite recursion.
- *
- * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
- * {@code false} if it should be skipped
- *
- * @return a latch that will be counted down if both beforeXxx() methods return false
- */
- private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
- CountDownLatch recursion = new CountDownLatch(3);
- CountDownLatch latch = new CountDownLatch(1);
-
- doAnswer(args -> {
-
- recursion.countDown();
- if (recursion.getCount() == 0) {
- fail("recursive calls to onTopicEvent");
- }
-
- int iarg = 0;
- CommInfrastructure proto = args.getArgument(iarg++);
- String topic = args.getArgument(iarg++);
- String event = args.getArgument(iarg++);
-
- if (mgr.beforeOffer(proto, topic, event)) {
- return null;
- }
- if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
- return null;
- }
-
- latch.countDown();
-
- return null;
- }).when(controller).onTopicEvent(any(), any(), any());
+ // buckets have null targets
+ mgr.startDistributing(bucketAssignments);
- return latch;
+ assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
/**
@@ -1170,7 +907,7 @@ public class PoolingManagerImplTest {
* @return a new bucket assignment
*/
private BucketAssignments makeAssignments(boolean sameHost) {
- int slot = REQUEST_ID.hashCode() % 2;
+ int slot = DECODED_EVENT.hashCode() % 2;
// slot numbers are 0 and 1 - reverse them if it's for a different host
if (!sameHost) {
@@ -1199,6 +936,7 @@ public class PoolingManagerImplTest {
*/
private void lockMgr() {
mgr.beforeLock();
+ when(controller.isLocked()).thenReturn(true);
}
/**
@@ -1206,6 +944,7 @@ public class PoolingManagerImplTest {
*/
private void unlockMgr() {
mgr.afterUnlock();
+ when(controller.isLocked()).thenReturn(false);
}
/**
@@ -1227,11 +966,6 @@ public class PoolingManagerImplTest {
}
@Override
- protected ClassExtractors makeClassExtractors(Properties props) {
- return extractors;
- }
-
- @Override
protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
gotDmaap = true;
return dmaap;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
index 662e0a7c..f4cd940c 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/SerializerTest.java
@@ -23,15 +23,9 @@ package org.onap.policy.drools.pooling;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import com.google.gson.JsonParseException;
-import java.util.Map;
-import java.util.TreeMap;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
@@ -44,33 +38,6 @@ public class SerializerTest {
}
@Test
- @SuppressWarnings("unchecked")
- public void testEncodeFilter() throws Exception {
- final Serializer ser = new Serializer();
-
- /*
- * Ensure raw maps serialize as expected. Use a TreeMap so the field
- * order is predictable.
- */
- Map<String, Object> top = new TreeMap<>();
- Map<String, Object> inner = new TreeMap<>();
- top.put("abc", 20);
- top.put("def", inner);
- top.put("ghi", true);
- inner.put("xyz", 30);
- assertEquals("{'abc':20,'def':{'xyz':30},'ghi':true}".replace('\'', '"'), ser.encodeFilter(top));
-
- /*
- * Ensure we can encode a complicated filter without throwing an
- * exception
- */
- Map<String, Object> complexFilter = makeAnd(makeEquals("fieldC", "valueC"),
- makeOr(makeEquals("fieldA", "valueA"), makeEquals("fieldB", "valueB")));
- String val = ser.encodeFilter(complexFilter);
- assertFalse(val.isEmpty());
- }
-
- @Test
public void testEncodeMsg_testDecodeMsg() throws Exception {
Serializer ser = new Serializer();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
deleted file mode 100644
index 0bf087a3..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTest.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.function.Function;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ClassExtractorsTest {
-
- private static final int NTIMES = 5;
-
- private static final String MY_TYPE = "theType";
- private static final String PROP_PREFIX = "extractor." + MY_TYPE + ".";
-
- private static final String VALUE = "a value";
- private static final Integer INT_VALUE = 10;
- private static final Integer INT_VALUE2 = 20;
-
- private Properties props;
- private ClassExtractors map;
-
- /**
- * Setup.
- *
- */
- @Before
- public void setUp() {
- props = new Properties();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- }
-
- @Test
- public void testExtract() {
- Simple obj = new Simple();
- assertEquals(INT_VALUE, map.extract(obj));
-
- // string value
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-
- // null object
- assertNull(map.extract(null));
-
- // values from two different kinds of objects
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Simple()));
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass method, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(VALUE, map.extract(new Sub()));
-
- // values from a superclass field, but property defined for subclass
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(INT_VALUE, map.extract(new Sub()));
-
-
- // prefix includes trailing "."
- props = new Properties();
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
- map = new ClassExtractors(props, PROP_PREFIX.substring(0, PROP_PREFIX.length() - 1), MY_TYPE);
- assertEquals(INT_VALUE, map.extract(new Simple()));
-
-
- // values from an class in a different file
- props = new Properties();
- props.setProperty(PROP_PREFIX + ClassExtractorsTestSupport.class.getName(), "${nested.theValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- assertEquals(ClassExtractorsTestSupport2.NESTED_VALUE, map.extract(new ClassExtractorsTestSupport()));
- }
-
- @Test
- public void testGetExtractor() {
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertEquals("x=" + x, INT_VALUE, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testBuildExtractorClass_TopLevel() {
- // extractor defined for top-level class
- props = new Properties();
- props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_SuperClass() {
- // extractor defined for superclass (interface)
- assertEquals(VALUE, map.extract(new Sub()));
-
- // one extractor for top-level class and one for interface
- assertEquals(2, map.size());
- }
-
- @Test
- public void testBuildExtractorClass_NotDefined() {
- // no extractor defined for "this" class
- assertNull(map.extract(this));
-
- // one NULL extractor for top-level class
- assertEquals(1, map.size());
- }
-
- @Test
- public void testBuildExtractorClassString() {
- // no leading "${"
- assertNull(tryIt(Simple.class, "intValue}", xxx -> new Simple()));
-
- // no trailing "}"
- assertNull(tryIt(Simple.class, "${intValue", xxx -> new Simple()));
-
- // leading "."
- assertNull(tryIt(Sub.class, "${.simple.strValue}", xxx -> new Sub()));
-
- // trailing "."
- assertNull(tryIt(Sub.class, "${simple.strValue.}", xxx -> new Sub()));
-
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // two components
- assertEquals(VALUE, tryIt(Sub.class, "${simple.strValue}", xxx -> new Sub()));
-
- // invalid component
- assertNull(tryIt(Sub.class, "${unknown}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InSuper() {
- // field in the superclass
- assertEquals(INT_VALUE, tryIt(Super.class, "${intValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testGetClassExtractor_InInterface() {
- // defined in the interface
- assertEquals(VALUE, map.extract(new Sub()));
- }
-
- @Test
- public void testNullExtractorExtract() {
- // empty properties - should only create NullExtractor
- map = new ClassExtractors(new Properties(), PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
-
- // repeat - shouldn't re-create the extractor
- for (int x = 0; x < NTIMES; ++x) {
- assertNull("x=" + x, map.extract(obj));
- assertEquals("x=" + x, 1, map.size());
- }
- }
-
- @Test
- public void testComponetizedExtractor() {
- // one component
- assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
- // three components
- assertEquals(VALUE, tryIt(Sub.class, "${cont.data.strValue}", xxx -> new Sub()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Method() {
- assertEquals(INT_VALUE, tryIt(Simple.class, "${intValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Field() {
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Map() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorBuildExtractor_Unknown() {
- assertNull(tryIt(Simple.class, "${unknown2}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorExtract_MiddleNull() {
- // data component is null
- assertEquals(null, tryIt(Sub.class, "${cont.data.strValue}", xxx -> {
- Sub obj = new Sub();
- obj.cont.simpleValue = null;
- return obj;
- }));
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor_VoidMethod() {
- // tell it to use getVoidValue()
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${voidValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- Simple obj = new Simple();
- assertNull(map.extract(obj));
-
- assertFalse(obj.voidInvoked);
- }
-
- @Test
- public void testComponetizedExtractorGetMethodExtractor() {
- assertEquals(INT_VALUE, map.extract(new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetFieldExtractor() {
- // use a field
- assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- Map<String, Object> outer = new TreeMap<>();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- @Test
- public void testComponetizedExtractorGetMapExtractor_MapSubclass() {
- Map<String, Object> inner = new TreeMap<>();
- inner.put("inner1", "abc1");
- inner.put("inner2", "abc2");
-
- MapSubclass outer = new MapSubclass();
- outer.put("outer1", "def1");
- outer.put("outer2", inner);
-
- Simple obj = new Simple();
-
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals(null, map.extract(obj));
-
- obj.mapValue = outer;
- props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
- assertEquals("abc2", map.extract(obj));
- }
-
- /**
- * Sets a property for the given class, makes an object, and then returns
- * the value extracted.
- *
- * @param clazz class whose property is to be set
- * @param propval value to which to set the property
- * @param makeObj function to create the object whose data is to be
- * extracted
- * @return the extracted data, or {@code null} if nothing was extracted
- */
- private Object tryIt(Class<?> clazz, String propval, Function<Void, Object> makeObj) {
- Properties props = new Properties();
- props.setProperty(PROP_PREFIX + clazz.getName(), propval);
-
- map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
- return map.extract(makeObj.apply(null));
- }
-
- /**
- * A Map subclass, used to verify that getMapExtractor() still handles it.
- */
- private static class MapSubclass extends TreeMap<String, Object> {
- private static final long serialVersionUID = 1L;
-
- }
-
- /**
- * A simple class.
- */
- private static class Simple {
-
- /**
- * This will not be used because getIntValue() will override it.
- */
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE2;
-
- /**
- * Used to verify retrieval via a field name.
- */
- @SuppressWarnings("unused")
- public final String strValue = VALUE;
-
- /**
- * Used to verify retrieval within maps.
- */
- @SuppressWarnings("unused")
- public Map<String, Object> mapValue = null;
-
- /**
- * {@code True} if {@link #getVoidValue()} was invoked, {@code false}
- * otherwise.
- */
- private boolean voidInvoked = false;
-
- /**
- * This function will supercede the value in the "intValue" field.
- *
- * @return INT_VALUE
- */
- @SuppressWarnings("unused")
- public Integer getIntValue() {
- return INT_VALUE;
- }
-
- /**
- * Used to verify that void functions are not invoked.
- */
- @SuppressWarnings("unused")
- public void getVoidValue() {
- voidInvoked = true;
- }
- }
-
- /**
- * Used to verify multi-component retrieval.
- */
- private static class Container {
- public Simple simpleValue = new Simple();
-
- @SuppressWarnings("unused")
- public Simple getData() {
- return simpleValue;
- }
- }
-
- /**
- * Used to verify extraction when the property refers to an interface.
- */
- private static interface WithString {
-
- String getStrValue();
- }
-
- /**
- * Used to verify retrieval within a superclass.
- */
- private static class Super implements WithString {
-
- @SuppressWarnings("unused")
- public final int intValue = INT_VALUE;
-
- @Override
- public String getStrValue() {
- return VALUE;
- }
- }
-
- /**
- * Used to verify retrieval within a subclass.
- */
- private static class Sub extends Super {
-
- @SuppressWarnings("unused")
- public final Simple simple = new Simple();
-
- /**
- * Used to verify multi-component retrieval.
- */
- public final Container cont = new Container();
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
deleted file mode 100644
index df42fe0f..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport {
-
- private ClassExtractorsTestSupport2 nested = new ClassExtractorsTestSupport2();
-
- /**
- * Constructor.
- */
- public ClassExtractorsTestSupport() {
- super();
- }
-
- public ClassExtractorsTestSupport2 getNested() {
- return nested;
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
deleted file mode 100644
index dddd2510..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ClassExtractorsTestSupport2.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport2 {
-
- public static final int NESTED_VALUE = 30;
-
- public final int theValue = NESTED_VALUE;
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
deleted file mode 100644
index aef0a925..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/ExtractorExceptionTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.onap.policy.common.utils.test.ExceptionsTester;
-
-public class ExtractorExceptionTest extends ExceptionsTester {
-
- @Test
- public void test() {
- assertEquals(5, test(ExtractorException.class));
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
deleted file mode 100644
index 7536d007..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/FieldExtractorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Field field;
- private FieldExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- field = MyClass.class.getDeclaredField("value");
- ext = new FieldExtractor(field);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- obj.value = null;
- assertEquals(null, ext.extract(obj));
-
- obj.value = VALUE + "X";
- assertEquals(VALUE + "X", ext.extract(obj));
-
- // different value type
- field = MyClass.class.getDeclaredField("value2");
- ext = new FieldExtractor(field);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- private static class MyClass {
- @SuppressWarnings("unused")
- public String value = VALUE;
-
- @SuppressWarnings("unused")
- public int value2 = INT_VALUE;
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
deleted file mode 100644
index 74694579..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MapExtractorTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MapExtractorTest {
- private static final String KEY = "a.key";
- private static final String VALUE = "a.value";
-
- private MapExtractor ext;
-
- @Before
- public void setUp() {
- ext = new MapExtractor(KEY);
- }
-
- @Test
- public void testExtract_NotAMap() {
-
- // object is not a map (i.e., it's a String)
- assertNull(ext.extract(KEY));
- }
-
- @Test
- public void testExtract_MissingValue() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
-
- // object is a map, but doesn't have the key
- assertNull(ext.extract(map));
- }
-
- @Test
- public void testExtract() {
-
- Map<String, Object> map = new HashMap<>();
- map.put(KEY + "x", VALUE + "x");
- map.put(KEY, VALUE);
-
- // object is a map and contains the key
- assertEquals(VALUE, ext.extract(map));
-
- // change to value to a different type
- map.put(KEY, 20);
- assertEquals(20, ext.extract(map));
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
deleted file mode 100644
index 41f731fb..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/extractor/MethodExtractorTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Method;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MethodExtractorTest {
-
- private static final String VALUE = "the value";
- private static final Integer INT_VALUE = 10;
-
- private Method meth;
- private MethodExtractor ext;
-
- @Before
- public void setUp() throws Exception {
- meth = MyClass.class.getMethod("getValue");
- ext = new MethodExtractor(meth);
- }
-
- @Test
- public void testExtract() throws Exception {
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // repeat
- assertEquals(VALUE, ext.extract(new MyClass()));
-
- // null value
- MyClass obj = new MyClass();
- meth = MyClass.class.getMethod("getNullValue");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(obj));
-
- // different value type
- meth = MyClass.class.getMethod("getIntValue");
- ext = new MethodExtractor(meth);
- assertEquals(INT_VALUE, ext.extract(new MyClass()));
- }
-
- @Test
- public void testExtract_ArgEx() {
- // pass it the wrong class type
- assertNull(ext.extract(this));
- }
-
- @Test
- public void testExtract_InvokeEx() throws Exception {
- // invoke method that throws an exception
- meth = MyClass.class.getMethod("throwException");
- ext = new MethodExtractor(meth);
- assertEquals(null, ext.extract(new MyClass()));
- }
-
- private static class MyClass {
-
- @SuppressWarnings("unused")
- public String getValue() {
- return VALUE;
- }
-
- @SuppressWarnings("unused")
- public int getIntValue() {
- return INT_VALUE;
- }
-
- @SuppressWarnings("unused")
- public String getNullValue() {
- return null;
- }
-
- @SuppressWarnings("unused")
- public String throwException() {
- throw new IllegalStateException("expected");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
deleted file mode 100644
index 99df69ec..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-
-public class ForwardTest extends SupportBasicMessageTester<Forward> {
- // values set by makeValidMessage()
- public static final CommInfrastructure VALID_PROTOCOL = CommInfrastructure.UEB;
- public static final int VALID_HOPS = 0;
- public static final String VALID_TOPIC = "topicA";
- public static final String VALID_PAYLOAD = "payloadA";
- public static final String VALID_REQUEST_ID = "requestIdA";
-
- /**
- * Time, in milliseconds, after which the most recent message was created.
- */
- private static long tcreateMs;
-
- public ForwardTest() {
- super(Forward.class);
- }
-
- @Test
- public void testBumpNumHops() {
- Forward msg = makeValidMessage();
-
- for (int x = 0; x < 3; ++x) {
- assertEquals("x=" + x, x, msg.getNumHops());
- msg.bumpNumHops();
- }
- }
-
- @Test
- public void testGetNumHops_testSetNumHops() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_HOPS, msg.getNumHops());
-
- msg.setNumHops(5);
- assertEquals(5, msg.getNumHops());
-
- msg.setNumHops(7);
- assertEquals(7, msg.getNumHops());
- }
-
- @Test
- public void testGetCreateTimeMs_testSetCreateTimeMs() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
-
- msg.setCreateTimeMs(1000L);
- assertEquals(1000L, msg.getCreateTimeMs());
-
- msg.setCreateTimeMs(2000L);
- assertEquals(2000L, msg.getCreateTimeMs());
- }
-
- @Test
- public void testGetProtocol_testSetProtocol() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.DMAAP);
- assertEquals(CommInfrastructure.DMAAP, msg.getProtocol());
-
- msg.setProtocol(CommInfrastructure.UEB);
- assertEquals(CommInfrastructure.UEB, msg.getProtocol());
- }
-
- @Test
- public void testGetTopic_testSetTopic() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_TOPIC, msg.getTopic());
-
- msg.setTopic("topicX");
- assertEquals("topicX", msg.getTopic());
-
- msg.setTopic("topicY");
- assertEquals("topicY", msg.getTopic());
- }
-
- @Test
- public void testGetPayload_testSetPayload() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_PAYLOAD, msg.getPayload());
-
- msg.setPayload("payloadX");
- assertEquals("payloadX", msg.getPayload());
-
- msg.setPayload("payloadY");
- assertEquals("payloadY", msg.getPayload());
- }
-
- @Test
- public void testGetRequestId_testSetRequestId() {
- Forward msg = makeValidMessage();
-
- // from constructor
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
-
- msg.setRequestId("reqX");
- assertEquals("reqX", msg.getRequestId());
-
- msg.setRequestId("reqY");
- assertEquals("reqY", msg.getRequestId());
- }
-
- @Test
- public void testIsExpired() {
- Forward msg = makeValidMessage();
-
- long tcreate = msg.getCreateTimeMs();
- assertTrue(msg.isExpired(tcreate + 1));
- assertTrue(msg.isExpired(tcreate + 10));
-
- assertFalse(msg.isExpired(tcreate));
- assertFalse(msg.isExpired(tcreate - 1));
- assertFalse(msg.isExpired(tcreate - 10));
- }
-
- @Test
- public void testCheckValidity_InvalidFields() throws Exception {
- // null source (i.e., superclass field)
- expectCheckValidityFailure(msg -> msg.setSource(null));
-
- // null protocol
- expectCheckValidityFailure(msg -> msg.setProtocol(null));
-
- // null or empty topic
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setTopic(value));
-
- // null payload
- expectCheckValidityFailure(msg -> msg.setPayload(null));
-
- // empty payload should NOT throw an exception
- Forward forward = makeValidMessage();
- forward.setPayload("");
- forward.checkValidity();
-
- // null or empty requestId
- expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setRequestId(value));
-
- // invalid hop count
- expectCheckValidityFailure(msg -> msg.setNumHops(-1));
- }
-
- @Override
- public Forward makeValidMessage() {
- tcreateMs = System.currentTimeMillis();
-
- Forward msg = new Forward(VALID_HOST, VALID_PROTOCOL, VALID_TOPIC, VALID_PAYLOAD, VALID_REQUEST_ID);
- msg.setChannel(VALID_CHANNEL);
-
- return msg;
- }
-
- @Override
- public void testDefaultConstructorFields(Forward msg) {
- super.testDefaultConstructorFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertEquals(0, msg.getCreateTimeMs());
- assertNull(msg.getPayload());
- assertNull(msg.getProtocol());
- assertNull(msg.getRequestId());
- assertNull(msg.getTopic());
- }
-
- @Override
- public void testValidFields(Forward msg) {
- super.testValidFields(msg);
-
- assertEquals(VALID_HOPS, msg.getNumHops());
- assertTrue(msg.getCreateTimeMs() >= tcreateMs);
- assertEquals(VALID_PAYLOAD, msg.getPayload());
- assertEquals(VALID_PROTOCOL, msg.getProtocol());
- assertEquals(VALID_REQUEST_ID, msg.getRequestId());
- assertEquals(VALID_TOPIC, msg.getTopic());
- }
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index d8ed62c8..771f694e 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -36,7 +36,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
@@ -44,7 +43,6 @@ import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -76,17 +74,6 @@ public class ActiveStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessHeartbeat_NullHost() {
assertNull(state.process(new Heartbeat()));
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
deleted file mode 100644
index f4eb870e..00000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/FilterUtilsTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
-import org.junit.Test;
-
-public class FilterUtilsTest {
-
- @Test
- public void testMakeEquals() {
- checkEquals("abc", "def", makeEquals("abc", "def"));
- }
-
- @Test
- public void testMakeAnd() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
-
- checkArray(CLASS_AND, 3, filter);
- checkEquals("an1", "av1", getItem(filter, 0));
- checkEquals("an2", "av2", getItem(filter, 1));
- checkEquals("an3", "av3", getItem(filter, 2));
- }
-
- @Test
- public void testMakeOr() {
- @SuppressWarnings("unchecked")
- Map<String, Object> filter =
- makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
-
- checkArray(CLASS_OR, 3, filter);
- checkEquals("on1", "ov1", getItem(filter, 0));
- checkEquals("on2", "ov2", getItem(filter, 1));
- checkEquals("on3", "ov3", getItem(filter, 2));
- }
-
- /**
- * Checks that the filter contains an array.
- *
- * @param expectedClassName type of filter this should represent
- * @param expectedCount number of items expected in the array
- * @param filter filter to be examined
- */
- protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
- assertEquals(expectedClassName, filter.get(JSON_CLASS));
-
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
- assertEquals(expectedCount, val.length);
- }
-
- /**
- * Checks that a map represents an "equals".
- *
- * @param name name of the field on the left side of the equals
- * @param value value on the right side of the equals
- * @param map map whose content is to be examined
- */
- protected void checkEquals(String name, String value, Map<String, Object> map) {
- assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
- assertEquals(name, map.get(JSON_FIELD));
- assertEquals(value, map.get(JSON_VALUE));
- }
-
- /**
- * Gets a particular sub-filter from the array contained within a filter.
- *
- * @param filter containing filter
- * @param index index of the sub-filter of interest
- * @return the sub-filter with the given index
- */
- @SuppressWarnings("unchecked")
- protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
- Object[] val = (Object[]) filter.get(JSON_FILTERS);
-
- return (Map<String, Object>) val[index];
- }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
index 9e3ddcf9..5cc88d3a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/IdleStateTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,21 +21,18 @@
package org.onap.policy.drools.pooling.state;
import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -54,26 +51,6 @@ public class IdleStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
verifyNothingPublished();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
index ab468a1c..f8f70461 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/InactiveStateTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,14 +29,12 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Query;
public class InactiveStateTest extends SupportBasicStateTester {
@@ -56,17 +54,6 @@ public class InactiveStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessLeader() {
State next = mock(State.class);
when(mgr.goActive()).thenReturn(next);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
index 7dc7b2fd..e7c9db72 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ProcessingStateTest.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
@@ -59,17 +58,6 @@ public class ProcessingStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testProcessQuery() {
State next = mock(State.class);
when(mgr.goQuery()).thenReturn(next);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index aa999b5d..70abb96a 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,14 +32,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
public class QueryStateTest extends SupportBasicStateTester {
@@ -58,17 +56,6 @@ public class QueryStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testGoQuery() {
assertNull(state.goQuery());
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 142fbf7e..3d64687f 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -31,16 +31,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Before;
import org.junit.Test;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -60,24 +57,6 @@ public class StartStateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-
- // get the sub-filter
- filter = utils.getItem(filter, 1);
-
- utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
- utils.getItem(filter, 1));
- }
-
- @Test
public void testStart() {
state.start();
@@ -142,15 +121,6 @@ public class StartStateTest extends SupportBasicStateTester {
}
@Test
- public void testProcessForward() {
- Forward msg = new Forward();
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
Heartbeat msg = new Heartbeat();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index 5284ed11..87868a76 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -31,17 +31,14 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
@@ -125,17 +122,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testGetFilter() {
- Map<String, Object> filter = state.getFilter();
-
- FilterUtilsTest utils = new FilterUtilsTest();
-
- utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
- utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
- utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
- }
-
- @Test
public void testStart() {
assertThatCode(() -> state.start()).doesNotThrowAnyException();
}
@@ -213,19 +199,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testProcessForward() {
- Forward msg = new Forward();
- assertNull(state.process(msg));
-
- verify(mgr, never()).handle(msg);
-
- msg.setChannel(MY_HOST);
- assertNull(state.process(msg));
-
- verify(mgr).handle(msg);
- }
-
- @Test
public void testProcessHeartbeat() {
assertNull(state.process(new Heartbeat()));
}
@@ -341,16 +314,6 @@ public class StateTest extends SupportBasicStateTester {
}
@Test
- public void testPublishStringForward() {
- String chnl = "channelF";
- Forward msg = new Forward();
-
- state.publish(chnl, msg);
-
- verify(mgr).publish(chnl, msg);
- }
-
- @Test
public void testPublishStringHeartbeat() {
String chnl = "channelH";
Heartbeat msg = new Heartbeat();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
index 1a65c802..18d77ba7 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/SupportBasicStateTester.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
@@ -143,12 +142,7 @@ public class SupportBasicStateTester {
when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
- prevState = new State(mgr) {
- @Override
- public Map<String, Object> getFilter() {
- throw new UnsupportedOperationException("cannot filter");
- }
- };
+ prevState = new State(mgr) {};
// capture publish() arguments
doAnswer(invocation -> {