aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-10-29 18:15:51 -0400
committerJim Hahn <jrh3@att.com>2020-11-02 18:17:49 -0500
commitdc6c4a21d46909dad59f4bd16cb6d4fc29fcce77 (patch)
treeef6bd7f152e189913c802d07aea43c83f3dc97e2 /feature-pooling-dmaap/src/main
parentd418aff3a9fd547941e40978c648d6209d332e37 (diff)
Make feature-pooling-dmaap work without filtering
As DMaaP server-side filtering has been deprecated, modified feature-pooling-dmaap to work without it. The new design assumes that each pdp gets its own unique consumer group, thus all pdps receive all events. Each pdp then uses the bucket assignments to determine whether or not to process the event. Note: this means that events no longer have to be forwarded to the correct host, thus the "Forward" class has been deleted. Other than that, the code already did post-filtering of events so most of it still works even without server-side filtering. As a result, most of the effort was in simply removing code that no longer applies. Per review comments: Modified code to use the event hash code instead of the request ID has code when routing events. This eliminated the need for the extractor classes and related properties. Replaced amsterdam and beijing properties with usecases properties. Issue-ID: POLICY-2881 Change-Id: I87e4f98c14f419593879c278d7da053c80575553 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'feature-pooling-dmaap/src/main')
-rw-r--r--feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java33
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java54
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java10
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java221
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java458
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java36
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java58
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java61
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java22
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java179
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java49
17 files changed, 83 insertions, 1393 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
index 2786a613..59c4b472 100644
--- a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-pooling-dmaap
# ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,34 +65,23 @@
# Topic used for inter-host communication for a particular controller
# pooling.<controller-name>.topic=XXX
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
# Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties. However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties. However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
# the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 9ba844ed..08c82fea 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
@@ -46,7 +45,7 @@ public class DmaapManager {
/**
* Topic source whose filter is to be manipulated.
*/
- private final FilterableTopicSource topicSource;
+ private final TopicSource topicSource;
/**
* Where to publish messages.
@@ -79,9 +78,6 @@ public class DmaapManager {
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
- // verify that we can set the filter
- setFilter(null);
-
} catch (IllegalArgumentException e) {
logger.error("failed to attach to topic {}", topic);
throw new PoolingFeatureException(e);
@@ -98,15 +94,10 @@ public class DmaapManager {
* @return the topic source
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
- private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ private TopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : getTopicSources()) {
if (topic.equals(src.getTopic())) {
- if (src instanceof FilterableTopicSource) {
- return (FilterableTopicSource) src;
-
- } else {
- throw new PoolingFeatureException("topic source " + topic + " is not filterable");
- }
+ return src;
}
}
@@ -199,22 +190,6 @@ public class DmaapManager {
}
/**
- * Sets the server-side filter to be used by the consumer.
- *
- * @param filter the filter string, or {@code null} if no filter is to be used
- * @throws PoolingFeatureException if the topic is not filterable
- */
- public void setFilter(String filter) throws PoolingFeatureException {
- try {
- logger.debug("change filter for topic {} to {}", topic, filter);
- topicSource.setFilter(filter);
-
- } catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic, e);
- }
- }
-
- /**
* Publishes a message to the sink.
*
* @param msg message to be published
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 00b7a215..c7574925 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -77,11 +77,11 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
- * later. As multiple threads can be active within the methods at the same time, we must keep
- * this in thread local storage.
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
- private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
/**
* Constructor.
@@ -236,19 +236,19 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- if (mgr.beforeOffer(protocol, topic2, event)) {
+ if (mgr.beforeOffer(topic2, event)) {
return true;
}
- offerArgs.set(new OfferArgs(protocol, topic2, event));
+ offerTopics.set(topic2);
return false;
}
@Override
public boolean beforeInsert(DroolsController droolsController, Object fact) {
- OfferArgs args = offerArgs.get();
- if (args == null) {
+ String topic = offerTopics.get();
+ if (topic == null) {
logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
@@ -279,7 +279,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ return mgr.beforeInsert(topic, fact);
}
@Override
@@ -287,7 +287,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean success) {
// clear any stored arguments
- offerArgs.remove();
+ offerTopics.remove();
return false;
}
@@ -345,40 +345,6 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
- /**
- * Arguments captured from beforeOffer().
- */
- private static class OfferArgs {
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event text that was received on the topic.
- */
- private String event;
-
- /**
- * Constructor.
- *
- * @param protocol protocol
- * @param topic topic
- * @param event the actual event data received on the topic
- */
- public OfferArgs(CommInfrastructure protocol, String topic, String event) {
- this.protocol = protocol;
- this.topic = topic;
- this.event = event;
- }
- }
-
/*
* The remaining methods may be overridden by junit tests.
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index e3576b8f..cc8e3a4d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
@@ -84,13 +83,6 @@ public interface PoolingManager {
void publish(String channel, Message msg);
/**
- * Handles a {@link Forward} event that was received from the internal topic.
- *
- * @param event event
- */
- void handle(Forward event);
-
- /**
* Schedules a timer to fire after a delay.
*
* @param delayMs delay, in milliseconds
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index b2966101..9093c7c0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling;
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
- /**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
private final CountDownLatch activeLatch;
@@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final DmaapManager dmaapMgr;
/**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
- /**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
@@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
- /**
* Constructs the manager, initializing all of the data structures.
*
* @param host name/uuid of this host
@@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*/
@@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
@@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
@@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
/**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
- /**
* Decodes an event from a String into an event Object.
*
* @param topic2 topic
@@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
- /**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
* decoded.
@@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index 80eee501..4627b9eb 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import java.util.HashMap;
import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
@@ -60,7 +59,6 @@ public class Serializer {
private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
static {
- class2type.put(Forward.class, "forward");
class2type.put(Heartbeat.class, "heartbeat");
class2type.put(Identification.class, "identification");
class2type.put(Leader.class, "leader");
@@ -79,7 +77,7 @@ public class Serializer {
/**
* Encodes a filter.
- *
+ *
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
*/
@@ -89,7 +87,7 @@ public class Serializer {
/**
* Encodes a message.
- *
+ *
* @param msg message to be encoded
* @return the message, serialized as a JSON string
*/
@@ -108,7 +106,7 @@ public class Serializer {
/**
* Decodes a JSON string into a Message.
- *
+ *
* @param msg JSON string representing the message
* @return the message
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
deleted file mode 100644
index bd75995f..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
- private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
- /**
- * Properties that specify how the data is to be extracted from a given
- * class.
- */
- private final Properties properties;
-
- /**
- * Property prefix, including a trailing ".".
- */
- private final String prefix;
-
- /**
- * Type of item to be extracted.
- */
- private final String type;
-
- /**
- * Maps the class name to its extractor.
- */
- private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param props properties that specify how the data is to be extracted from
- * a given class
- * @param prefix property name prefix, prepended before the class name
- * @param type type of item to be extracted
- */
- public ClassExtractors(Properties props, String prefix, String type) {
- this.properties = props;
- this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
- this.type = type;
- }
-
- /**
- * Gets the number of extractors in the map.
- *
- * @return gets the number of extractors in the map
- */
- protected int size() {
- return class2extractor.size();
- }
-
- /**
- * Extracts the desired data item from an object.
- *
- * @param object object from which to extract the data item
- * @return the extracted item, or {@code null} if it could not be extracted
- */
- public Object extract(Object object) {
- if (object == null) {
- return null;
- }
-
- Extractor ext = getExtractor(object);
-
- return ext.extract(object);
- }
-
- /**
- * Gets the extractor for the given type of object, creating one if it
- * doesn't exist yet.
- *
- * @param object object whose extracted is desired
- * @return an extractor for the object
- */
- private Extractor getExtractor(Object object) {
- Class<?> clazz = object.getClass();
- Extractor ext = class2extractor.get(clazz.getName());
-
- if (ext == null) {
- // allocate a new extractor, if another thread doesn't beat us to it
- ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
-
- return ext;
- }
-
- /**
- * Builds an extractor for the class.
- *
- * @param clazz class for which the extractor should be built
- *
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz) {
- String value = properties.getProperty(prefix + clazz.getName(), null);
- if (value != null) {
- // property has config info for this class - build the extractor
- return buildExtractor(clazz, value);
- }
-
- /*
- * Get the extractor, if any, for the super class or interfaces, but
- * don't add one if it doesn't exist
- */
- Extractor ext = getClassExtractor(clazz, false);
- if (ext != null) {
- return ext;
- }
-
- /*
- * No extractor defined for for this class or its super class - we
- * cannot extract data items from objects of this type, so just
- * allocated a null extractor.
- */
- logger.warn("missing property {}{}", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- /**
- * Builds an extractor for the class, based on the config value extracted
- * from the corresponding property.
- *
- * @param clazz class for which the extractor should be built
- * @param value config value (e.g., "${event.request.id}"
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz, String value) {
- if (!value.startsWith("${")) {
- logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
- return new NullExtractor();
- }
-
- if (!value.endsWith("}")) {
- logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // get the part in the middle
- String val = value.substring(2, value.length() - 1);
- if (val.startsWith(".")) {
- logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- if (val.endsWith(".")) {
- logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // everything's valid - create the extractor
- try {
- ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
- /*
- * If there's only one extractor, then just return it, otherwise
- * return the whole extractor.
- */
- return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
- } catch (ExtractorException e) {
- logger.warn("cannot build extractor for {}", clazz.getName(), e);
- return new NullExtractor();
- }
- }
-
- /**
- * Gets the extractor for a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose extractor is desired
- * @param addOk {@code true} if the extractor may be added, provided the
- * property is defined, {@code false} otherwise
- * @return the extractor to be used for the class, or {@code null} if no
- * extractor has been defined yet
- */
- private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
- if (clazz == null) {
- return null;
- }
-
- Extractor ext = null;
-
- if (addOk) {
- String val = properties.getProperty(prefix + clazz.getName(), null);
-
- if (val != null) {
- /*
- * A property is defined for this class, so create the extractor
- * for it.
- */
- return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
- }
-
- // see if the superclass has an extractor
- if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
- return ext;
- }
-
- // check the interfaces, too
- for (Class<?> clz : clazz.getInterfaces()) {
- if ((ext = getClassExtractor(clz, true)) != null) {
- break;
- }
- }
-
- return ext;
- }
-
- /**
- * Extractor that always returns {@code null}. Used when no extractor could
- * be built for a given object type.
- */
- private class NullExtractor implements Extractor {
-
- @Override
- public Object extract(Object object) {
- logger.info("cannot extract {} from {}", type, object.getClass());
- return null;
- }
- }
-
- /**
- * Component-ized extractor. Extracts an object that is referenced
- * hierarchically, where each name identifies a particular component within
- * the hierarchy. Supports retrieval from {@link Map} objects, as well as
- * via getXxx() methods, or by direct field retrieval.
- *
- * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
- */
- private class ComponetizedExtractor implements Extractor {
-
- /**
- * Extractor for each component.
- */
- private final Extractor[] extractors;
-
- /**
- * Constructor.
- *
- * @param clazz the class associated with the object at the root of the
- * hierarchy
- * @param names name associated with each component
- * @throws ExtractorException extractor exception
- */
- public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
- this.extractors = new Extractor[names.length];
-
- Class<?> clz = clazz;
-
- for (int x = 0; x < names.length; ++x) {
- String comp = names[x];
-
- Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
- extractors[x] = pair.getLeft();
- clz = pair.getRight();
- }
- }
-
- /**
- * Builds an extractor for the given component of an object.
- *
- * @param clazz type of object from which the component will be
- * extracted
- * @param comp name of the component to extract
- * @return a pair containing the extractor and the extracted object's
- * type
- * @throws ExtractorException extrator exception
- */
- private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
- Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
- if (pair == null) {
- pair = getFieldExtractor(clazz, comp);
- }
-
- if (pair == null) {
- pair = getMapExtractor(clazz, comp);
- }
-
-
- // didn't find an extractor
- if (pair == null) {
- throw new ExtractorException("class " + clazz + " contains no element " + comp);
- }
-
- return pair;
- }
-
- @Override
- public Object extract(Object object) {
- Object obj = object;
-
- for (Extractor ext : extractors) {
- if (obj == null) {
- break;
- }
-
- obj = ext.extract(obj);
- }
-
- return obj;
- }
-
- /**
- * Gets an extractor that invokes a getXxx() method to retrieve the
- * object.
- *
- * @param clazz container's class
- * @param name name of the property to be retrieved
- * @return a new extractor, or {@code null} if the class does not
- * contain the corresponding getXxx() method
- * @throws ExtractorException if the getXxx() method is inaccessible
- */
- private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
- Method meth;
-
- String nm = "get" + StringUtils.capitalize(name);
-
- try {
- meth = clazz.getMethod(nm);
-
- Class<?> retType = meth.getReturnType();
- if (retType == void.class) {
- // it's a void method, thus it won't return an object
- return null;
- }
-
- return Pair.of(new MethodExtractor(meth), retType);
-
- } catch (NoSuchMethodException expected) {
- // no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
- }
- }
-
- /**
- * Gets an extractor for a field within the object.
- *
- * @param clazz container's class
- * @param name name of the field whose value is to be extracted
- * @return a new extractor, or {@code null} if the class does not
- * contain the given field
- * @throws ExtractorException if the field is inaccessible
- */
- private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
- Field field = getClassField(clazz, name);
- if (field == null) {
- return null;
- }
-
- return Pair.of(new FieldExtractor(field), field.getType());
- }
-
- /**
- * Gets an extractor for an item within a Map object.
- *
- * @param clazz container's class
- * @param key item key within the map
- * @return a new extractor, or {@code null} if the class is not a Map
- * subclass
- */
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
- if (!Map.class.isAssignableFrom(clazz)) {
- return null;
- }
-
- /*
- * Don't know the value's actual type, so we'll assume it's a Map
- * for now. Things should still work OK, as this is only used to
- * direct the constructor on what type of extractor to create next.
- * If the object turns out not to be a map, then the MapExtractor
- * for the next component will just return null.
- */
- return Pair.of(new MapExtractor(key), Map.class);
- }
-
- /**
- * Gets field within a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose field is desired
- * @param name name of the desired field
- * @return the field within the class, or {@code null} if the field does
- * not exist
- * @throws ExtractorException if the field is inaccessible
- */
- private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
- if (clazz == null) {
- return null;
- }
-
- try {
- return clazz.getField(name);
-
- } catch (NoSuchFieldException expected) {
- // no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
- }
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
deleted file mode 100644
index 5e02d602..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
- /**
- * Extracts an object contained within another object.
- *
- * @param object object from which to extract the contained object
- * @return the extracted value, or {@code null} if it cannot be extracted
- */
- Object extract(Object object);
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
deleted file mode 100644
index a864672b..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public ExtractorException() {
- super();
- }
-
- public ExtractorException(String message) {
- super(message);
- }
-
- public ExtractorException(Throwable cause) {
- super(cause);
- }
-
- public ExtractorException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
deleted file mode 100644
index 9389ab22..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
- /**
- * Field containing the object.
- */
- private final Field field;
-
- /**
- * Constructor.
- *
- * @param field field containing the object
- */
- public FieldExtractor(Field field) {
- this.field = field;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return field.get(object);
-
- } catch (IllegalAccessException | IllegalArgumentException e) {
- logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
deleted file mode 100644
index 9c5be5ff..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
- /**
- * Key to the item to extract from the map.
- */
- private final String key;
-
- /**
- * Constructor.
- *
- * @param key key to the item to extract from the map
- */
- public MapExtractor(String key) {
- this.key = key;
- }
-
- @Override
- public Object extract(Object object) {
-
- if (object instanceof Map) {
- Map<?, ?> map = (Map<?, ?>) object;
-
- return map.get(key);
-
- } else {
- logger.warn("expecting a map, instead of {}", object.getClass());
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
deleted file mode 100644
index 3efef5ec..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
- /**
- * Method to invoke to extract the contained object.
- */
- private final Method method;
-
- /**
- * Constructor.
- *
- * @param method method to invoke to extract the contained object
- */
- public MethodExtractor(Method method) {
- this.method = method;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return method.invoke(object);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index 6be080f7..c78fb17b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,7 +63,7 @@ public class BucketAssignments {
/**
* Constructor.
- *
+ *
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
*/
@@ -81,7 +81,7 @@ public class BucketAssignments {
/**
* Gets the leader, which is the host with the minimum UUID.
- *
+ *
* @return the assignment leader
*/
public String getLeader() {
@@ -103,7 +103,7 @@ public class BucketAssignments {
/**
* Determines if a host has an assignment.
- *
+ *
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
@@ -123,7 +123,7 @@ public class BucketAssignments {
/**
* Gets all of the hosts that have an assignment.
- *
+ *
* @return all of the hosts that have an assignment
*/
public Set<String> getAllHosts() {
@@ -143,7 +143,7 @@ public class BucketAssignments {
/**
* Gets the host assigned to a given bucket.
- *
+ *
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
@@ -153,12 +153,12 @@ public class BucketAssignments {
return null;
}
- return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
* Gets the number of buckets.
- *
+ *
* @return the number of buckets
*/
public int size() {
@@ -168,7 +168,7 @@ public class BucketAssignments {
/**
* Checks the validity of the assignments, verifying that all buckets have been
* assigned to a host.
- *
+ *
* @throws PoolingFeatureException if the assignments are invalid
*/
public void checkValidity() throws PoolingFeatureException {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
deleted file mode 100644
index 3c34e971..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
- /**
- * Number of hops (i.e., number of times it's been forwarded) so far.
- */
- private int numHops;
-
- /**
- * Time, in milliseconds, at which the message was created.
- */
- private long createTimeMs;
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event pay load that was received on the topic.
- */
- private String payload;
-
- /**
- * The request id that was extracted from the event.
- */
- private String requestId;
-
- /**
- * Constructor.
- */
- public Forward() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param source host on which the message originated
- * @param protocol protocol
- * @param topic topic
- * @param payload the actual event data received on the topic
- * @param requestId request id
- */
- public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
- super(source);
-
- this.numHops = 0;
- this.createTimeMs = System.currentTimeMillis();
- this.protocol = protocol;
- this.topic = topic;
- this.payload = payload;
- this.requestId = requestId;
- }
-
- /**
- * Increments {@link #numHops}.
- */
- public void bumpNumHops() {
- ++numHops;
- }
-
- public int getNumHops() {
- return numHops;
- }
-
- public void setNumHops(int numHops) {
- this.numHops = numHops;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public void setCreateTimeMs(long createTimeMs) {
- this.createTimeMs = createTimeMs;
- }
-
- public CommInfrastructure getProtocol() {
- return protocol;
- }
-
- public void setProtocol(CommInfrastructure protocol) {
- this.protocol = protocol;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public boolean isExpired(long minCreateTimeMs) {
- return (createTimeMs < minCreateTimeMs);
-
- }
-
- @Override
- public void checkValidity() throws PoolingFeatureException {
-
- super.checkValidity();
-
- if (protocol == null) {
- throw new PoolingFeatureException("missing message protocol");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new PoolingFeatureException("missing message topic");
- }
-
- /*
- * Note: an empty pay load is OK, as an empty message could have been
- * received on the topic.
- */
-
- if (payload == null) {
- throw new PoolingFeatureException("missing message payload");
- }
-
- if (requestId == null || requestId.isEmpty()) {
- throw new PoolingFeatureException("missing message requestId");
- }
-
- if (numHops < 0) {
- throw new PoolingFeatureException("invalid message hop count");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
deleted file mode 100644
index 5b7012d2..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
- // message element names
- public static final String MSG_CHANNEL = "channel";
- public static final String MSG_TIMESTAMP = "timestampMs";
-
- // json element names
- protected static final String JSON_CLASS = "class";
- protected static final String JSON_FILTERS = "filters";
- protected static final String JSON_FIELD = "field";
- protected static final String JSON_VALUE = "value";
-
- // values to be stuck into the "class" element
- protected static final String CLASS_OR = "Or";
- protected static final String CLASS_AND = "And";
- protected static final String CLASS_EQUALS = "Equals";
-
- /**
- * Constructor.
- */
- private FilterUtils() {
- super();
- }
-
- /**
- * Makes a filter that verifies that a field equals a value.
- *
- * @param field name of the field to check
- * @param value desired value
- * @return a map representing an "equals" filter
- */
- public static Map<String, Object> makeEquals(String field, String value) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_EQUALS);
- map.put(JSON_FIELD, field);
- map.put(JSON_VALUE, value);
-
- return map;
- }
-
- /**
- * Makes an "and" filter, where all of the items must be true.
- *
- * @param items items to be checked
- * @return an "and" filter
- */
- public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_AND);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-
- /**
- * Makes an "or" filter, where at least one of the items must be true.
- *
- * @param items items to be checked
- * @return an "or" filter
- */
- public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_OR);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index 59d264ec..c5761bfd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,16 +20,8 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +41,7 @@ public class StartState extends State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
@@ -58,7 +50,7 @@ public class StartState extends State {
/**
* Get Heart beat time stamp in milliseconds.
- *
+ *
* @return the time stamp inserted into the heart beat message
*/
public long getHbTimestampMs() {
@@ -110,14 +102,4 @@ public class StartState extends State {
return null;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getFilter() {
- // ignore everything except our most recent heart beat message
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
- makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index f2277be3..853c24d4 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -20,22 +20,15 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
@@ -71,18 +64,6 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic. The
- * default method returns a filter that accepts messages on the admin channel and on
- * the host's own channel.
- *
- * @return the server-side filter to use.
- */
- @SuppressWarnings("unchecked")
- public Map<String, Object> getFilter() {
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
- }
-
- /**
* Cancels the timers added by this state.
*/
public final void cancelTimers() {
@@ -142,26 +123,6 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to handle and
- * returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Forward msg) {
- if (getHost().equals(msg.getChannel())) {
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
-
- } else {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
- getTopic());
- }
-
- return null;
- }
-
- /**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
@@ -293,16 +254,6 @@ public abstract class State {
* @param channel channel
* @param msg message to be published
*/
- protected final void publish(String channel, Forward msg) {
- mgr.publish(channel, msg);
- }
-
- /**
- * Publishes a message on the specified channel.
- *
- * @param channel channel
- * @param msg message to be published
- */
protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}