aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main')
-rw-r--r--feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java33
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java54
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java10
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java221
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java14
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java458
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java36
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java49
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java58
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java61
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java22
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java179
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java96
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java28
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java49
17 files changed, 83 insertions, 1393 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
index 2786a613..59c4b472 100644
--- a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -2,14 +2,14 @@
# ============LICENSE_START=======================================================
# feature-pooling-dmaap
# ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -65,34 +65,23 @@
# Topic used for inter-host communication for a particular controller
# pooling.<controller-name>.topic=XXX
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
# Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties. However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties. However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
# the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 9ba844ed..08c82fea 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
@@ -46,7 +45,7 @@ public class DmaapManager {
/**
* Topic source whose filter is to be manipulated.
*/
- private final FilterableTopicSource topicSource;
+ private final TopicSource topicSource;
/**
* Where to publish messages.
@@ -79,9 +78,6 @@ public class DmaapManager {
this.topicSource = findTopicSource();
this.topicSink = findTopicSink();
- // verify that we can set the filter
- setFilter(null);
-
} catch (IllegalArgumentException e) {
logger.error("failed to attach to topic {}", topic);
throw new PoolingFeatureException(e);
@@ -98,15 +94,10 @@ public class DmaapManager {
* @return the topic source
* @throws PoolingFeatureException if the source doesn't exist or is not filterable
*/
- private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+ private TopicSource findTopicSource() throws PoolingFeatureException {
for (TopicSource src : getTopicSources()) {
if (topic.equals(src.getTopic())) {
- if (src instanceof FilterableTopicSource) {
- return (FilterableTopicSource) src;
-
- } else {
- throw new PoolingFeatureException("topic source " + topic + " is not filterable");
- }
+ return src;
}
}
@@ -199,22 +190,6 @@ public class DmaapManager {
}
/**
- * Sets the server-side filter to be used by the consumer.
- *
- * @param filter the filter string, or {@code null} if no filter is to be used
- * @throws PoolingFeatureException if the topic is not filterable
- */
- public void setFilter(String filter) throws PoolingFeatureException {
- try {
- logger.debug("change filter for topic {} to {}", topic, filter);
- topicSource.setFilter(filter);
-
- } catch (UnsupportedOperationException e) {
- throw new PoolingFeatureException("cannot filter topic " + topic, e);
- }
- }
-
- /**
* Publishes a message to the sink.
*
* @param msg message to be published
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 00b7a215..c7574925 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -77,11 +77,11 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
- * later. As multiple threads can be active within the methods at the same time, we must keep
- * this in thread local storage.
+ * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+ * called later. As multiple threads can be active within the methods at the same
+ * time, we must keep this in thread local storage.
*/
- private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+ private ThreadLocal<String> offerTopics = new ThreadLocal<>();
/**
* Constructor.
@@ -236,19 +236,19 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- if (mgr.beforeOffer(protocol, topic2, event)) {
+ if (mgr.beforeOffer(topic2, event)) {
return true;
}
- offerArgs.set(new OfferArgs(protocol, topic2, event));
+ offerTopics.set(topic2);
return false;
}
@Override
public boolean beforeInsert(DroolsController droolsController, Object fact) {
- OfferArgs args = offerArgs.get();
- if (args == null) {
+ String topic = offerTopics.get();
+ if (topic == null) {
logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
return false;
}
@@ -279,7 +279,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
return false;
}
- return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+ return mgr.beforeInsert(topic, fact);
}
@Override
@@ -287,7 +287,7 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean success) {
// clear any stored arguments
- offerArgs.remove();
+ offerTopics.remove();
return false;
}
@@ -345,40 +345,6 @@ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerF
boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
}
- /**
- * Arguments captured from beforeOffer().
- */
- private static class OfferArgs {
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event text that was received on the topic.
- */
- private String event;
-
- /**
- * Constructor.
- *
- * @param protocol protocol
- * @param topic topic
- * @param event the actual event data received on the topic
- */
- public OfferArgs(CommInfrastructure protocol, String topic, String event) {
- this.protocol = protocol;
- this.topic = topic;
- this.event = event;
- }
- }
-
/*
* The remaining methods may be overridden by junit tests.
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index e3576b8f..cc8e3a4d 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
package org.onap.policy.drools.pooling;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
@@ -84,13 +83,6 @@ public interface PoolingManager {
void publish(String channel, Message msg);
/**
- * Handles a {@link Forward} event that was received from the internal topic.
- *
- * @param event event
- */
- void handle(Forward event);
-
- /**
* Schedules a timer to fire after a delay.
*
* @param delayMs delay, in milliseconds
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index b2966101..9093c7c0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling;
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
- /**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
private final CountDownLatch activeLatch;
@@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final DmaapManager dmaapMgr;
/**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
- /**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
@@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
- /**
* Constructs the manager, initializing all of the data structures.
*
* @param host name/uuid of this host
@@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*/
@@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
@@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
@@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
/**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
- /**
* Decodes an event from a String into an event Object.
*
* @param topic2 topic
@@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
- /**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
* decoded.
@@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index 80eee501..4627b9eb 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import java.util.HashMap;
import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
@@ -60,7 +59,6 @@ public class Serializer {
private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
static {
- class2type.put(Forward.class, "forward");
class2type.put(Heartbeat.class, "heartbeat");
class2type.put(Identification.class, "identification");
class2type.put(Leader.class, "leader");
@@ -79,7 +77,7 @@ public class Serializer {
/**
* Encodes a filter.
- *
+ *
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
*/
@@ -89,7 +87,7 @@ public class Serializer {
/**
* Encodes a message.
- *
+ *
* @param msg message to be encoded
* @return the message, serialized as a JSON string
*/
@@ -108,7 +106,7 @@ public class Serializer {
/**
* Decodes a JSON string into a Message.
- *
+ *
* @param msg JSON string representing the message
* @return the message
*/
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
deleted file mode 100644
index bd75995f..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ClassExtractors.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code>&lt;a.prefix>.&lt;class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
- private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
- /**
- * Properties that specify how the data is to be extracted from a given
- * class.
- */
- private final Properties properties;
-
- /**
- * Property prefix, including a trailing ".".
- */
- private final String prefix;
-
- /**
- * Type of item to be extracted.
- */
- private final String type;
-
- /**
- * Maps the class name to its extractor.
- */
- private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param props properties that specify how the data is to be extracted from
- * a given class
- * @param prefix property name prefix, prepended before the class name
- * @param type type of item to be extracted
- */
- public ClassExtractors(Properties props, String prefix, String type) {
- this.properties = props;
- this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
- this.type = type;
- }
-
- /**
- * Gets the number of extractors in the map.
- *
- * @return gets the number of extractors in the map
- */
- protected int size() {
- return class2extractor.size();
- }
-
- /**
- * Extracts the desired data item from an object.
- *
- * @param object object from which to extract the data item
- * @return the extracted item, or {@code null} if it could not be extracted
- */
- public Object extract(Object object) {
- if (object == null) {
- return null;
- }
-
- Extractor ext = getExtractor(object);
-
- return ext.extract(object);
- }
-
- /**
- * Gets the extractor for the given type of object, creating one if it
- * doesn't exist yet.
- *
- * @param object object whose extracted is desired
- * @return an extractor for the object
- */
- private Extractor getExtractor(Object object) {
- Class<?> clazz = object.getClass();
- Extractor ext = class2extractor.get(clazz.getName());
-
- if (ext == null) {
- // allocate a new extractor, if another thread doesn't beat us to it
- ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
-
- return ext;
- }
-
- /**
- * Builds an extractor for the class.
- *
- * @param clazz class for which the extractor should be built
- *
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz) {
- String value = properties.getProperty(prefix + clazz.getName(), null);
- if (value != null) {
- // property has config info for this class - build the extractor
- return buildExtractor(clazz, value);
- }
-
- /*
- * Get the extractor, if any, for the super class or interfaces, but
- * don't add one if it doesn't exist
- */
- Extractor ext = getClassExtractor(clazz, false);
- if (ext != null) {
- return ext;
- }
-
- /*
- * No extractor defined for for this class or its super class - we
- * cannot extract data items from objects of this type, so just
- * allocated a null extractor.
- */
- logger.warn("missing property {}{}", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- /**
- * Builds an extractor for the class, based on the config value extracted
- * from the corresponding property.
- *
- * @param clazz class for which the extractor should be built
- * @param value config value (e.g., "${event.request.id}"
- * @return a new extractor
- */
- private Extractor buildExtractor(Class<?> clazz, String value) {
- if (!value.startsWith("${")) {
- logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
- return new NullExtractor();
- }
-
- if (!value.endsWith("}")) {
- logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // get the part in the middle
- String val = value.substring(2, value.length() - 1);
- if (val.startsWith(".")) {
- logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- if (val.endsWith(".")) {
- logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
- return new NullExtractor();
- }
-
- // everything's valid - create the extractor
- try {
- ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
- /*
- * If there's only one extractor, then just return it, otherwise
- * return the whole extractor.
- */
- return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
- } catch (ExtractorException e) {
- logger.warn("cannot build extractor for {}", clazz.getName(), e);
- return new NullExtractor();
- }
- }
-
- /**
- * Gets the extractor for a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose extractor is desired
- * @param addOk {@code true} if the extractor may be added, provided the
- * property is defined, {@code false} otherwise
- * @return the extractor to be used for the class, or {@code null} if no
- * extractor has been defined yet
- */
- private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
- if (clazz == null) {
- return null;
- }
-
- Extractor ext = null;
-
- if (addOk) {
- String val = properties.getProperty(prefix + clazz.getName(), null);
-
- if (val != null) {
- /*
- * A property is defined for this class, so create the extractor
- * for it.
- */
- return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
- }
- }
-
- // see if the superclass has an extractor
- if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
- return ext;
- }
-
- // check the interfaces, too
- for (Class<?> clz : clazz.getInterfaces()) {
- if ((ext = getClassExtractor(clz, true)) != null) {
- break;
- }
- }
-
- return ext;
- }
-
- /**
- * Extractor that always returns {@code null}. Used when no extractor could
- * be built for a given object type.
- */
- private class NullExtractor implements Extractor {
-
- @Override
- public Object extract(Object object) {
- logger.info("cannot extract {} from {}", type, object.getClass());
- return null;
- }
- }
-
- /**
- * Component-ized extractor. Extracts an object that is referenced
- * hierarchically, where each name identifies a particular component within
- * the hierarchy. Supports retrieval from {@link Map} objects, as well as
- * via getXxx() methods, or by direct field retrieval.
- *
- * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
- */
- private class ComponetizedExtractor implements Extractor {
-
- /**
- * Extractor for each component.
- */
- private final Extractor[] extractors;
-
- /**
- * Constructor.
- *
- * @param clazz the class associated with the object at the root of the
- * hierarchy
- * @param names name associated with each component
- * @throws ExtractorException extractor exception
- */
- public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
- this.extractors = new Extractor[names.length];
-
- Class<?> clz = clazz;
-
- for (int x = 0; x < names.length; ++x) {
- String comp = names[x];
-
- Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
- extractors[x] = pair.getLeft();
- clz = pair.getRight();
- }
- }
-
- /**
- * Builds an extractor for the given component of an object.
- *
- * @param clazz type of object from which the component will be
- * extracted
- * @param comp name of the component to extract
- * @return a pair containing the extractor and the extracted object's
- * type
- * @throws ExtractorException extrator exception
- */
- private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
- Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
- if (pair == null) {
- pair = getFieldExtractor(clazz, comp);
- }
-
- if (pair == null) {
- pair = getMapExtractor(clazz, comp);
- }
-
-
- // didn't find an extractor
- if (pair == null) {
- throw new ExtractorException("class " + clazz + " contains no element " + comp);
- }
-
- return pair;
- }
-
- @Override
- public Object extract(Object object) {
- Object obj = object;
-
- for (Extractor ext : extractors) {
- if (obj == null) {
- break;
- }
-
- obj = ext.extract(obj);
- }
-
- return obj;
- }
-
- /**
- * Gets an extractor that invokes a getXxx() method to retrieve the
- * object.
- *
- * @param clazz container's class
- * @param name name of the property to be retrieved
- * @return a new extractor, or {@code null} if the class does not
- * contain the corresponding getXxx() method
- * @throws ExtractorException if the getXxx() method is inaccessible
- */
- private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
- Method meth;
-
- String nm = "get" + StringUtils.capitalize(name);
-
- try {
- meth = clazz.getMethod(nm);
-
- Class<?> retType = meth.getReturnType();
- if (retType == void.class) {
- // it's a void method, thus it won't return an object
- return null;
- }
-
- return Pair.of(new MethodExtractor(meth), retType);
-
- } catch (NoSuchMethodException expected) {
- // no getXxx() method, maybe there's a field by this name
- logger.debug("no method {} in {}", nm, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
- }
- }
-
- /**
- * Gets an extractor for a field within the object.
- *
- * @param clazz container's class
- * @param name name of the field whose value is to be extracted
- * @return a new extractor, or {@code null} if the class does not
- * contain the given field
- * @throws ExtractorException if the field is inaccessible
- */
- private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
- Field field = getClassField(clazz, name);
- if (field == null) {
- return null;
- }
-
- return Pair.of(new FieldExtractor(field), field.getType());
- }
-
- /**
- * Gets an extractor for an item within a Map object.
- *
- * @param clazz container's class
- * @param key item key within the map
- * @return a new extractor, or {@code null} if the class is not a Map
- * subclass
- */
- private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
- if (!Map.class.isAssignableFrom(clazz)) {
- return null;
- }
-
- /*
- * Don't know the value's actual type, so we'll assume it's a Map
- * for now. Things should still work OK, as this is only used to
- * direct the constructor on what type of extractor to create next.
- * If the object turns out not to be a map, then the MapExtractor
- * for the next component will just return null.
- */
- return Pair.of(new MapExtractor(key), Map.class);
- }
-
- /**
- * Gets field within a class, examining all super classes and
- * interfaces.
- *
- * @param clazz class whose field is desired
- * @param name name of the desired field
- * @return the field within the class, or {@code null} if the field does
- * not exist
- * @throws ExtractorException if the field is inaccessible
- */
- private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
- if (clazz == null) {
- return null;
- }
-
- try {
- return clazz.getField(name);
-
- } catch (NoSuchFieldException expected) {
- // no field by this name - try super class & interfaces
- logger.debug("no field {} in {}", name, clazz.getName(), expected);
- return null;
-
- } catch (SecurityException e) {
- throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
- }
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
deleted file mode 100644
index 5e02d602..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/Extractor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
- /**
- * Extracts an object contained within another object.
- *
- * @param object object from which to extract the contained object
- * @return the extracted value, or {@code null} if it cannot be extracted
- */
- Object extract(Object object);
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
deleted file mode 100644
index a864672b..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/ExtractorException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public ExtractorException() {
- super();
- }
-
- public ExtractorException(String message) {
- super(message);
- }
-
- public ExtractorException(Throwable cause) {
- super(cause);
- }
-
- public ExtractorException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
deleted file mode 100644
index 9389ab22..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/FieldExtractor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
- /**
- * Field containing the object.
- */
- private final Field field;
-
- /**
- * Constructor.
- *
- * @param field field containing the object
- */
- public FieldExtractor(Field field) {
- this.field = field;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return field.get(object);
-
- } catch (IllegalAccessException | IllegalArgumentException e) {
- logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
deleted file mode 100644
index 9c5be5ff..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MapExtractor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
- /**
- * Key to the item to extract from the map.
- */
- private final String key;
-
- /**
- * Constructor.
- *
- * @param key key to the item to extract from the map
- */
- public MapExtractor(String key) {
- this.key = key;
- }
-
- @Override
- public Object extract(Object object) {
-
- if (object instanceof Map) {
- Map<?, ?> map = (Map<?, ?>) object;
-
- return map.get(key);
-
- } else {
- logger.warn("expecting a map, instead of {}", object.getClass());
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
deleted file mode 100644
index 3efef5ec..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/extractor/MethodExtractor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
- /**
- * Method to invoke to extract the contained object.
- */
- private final Method method;
-
- /**
- * Constructor.
- *
- * @param method method to invoke to extract the contained object
- */
- public MethodExtractor(Method method) {
- this.method = method;
- }
-
- @Override
- public Object extract(Object object) {
- try {
- return method.invoke(object);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
- return null;
- }
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index 6be080f7..c78fb17b 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,7 +63,7 @@ public class BucketAssignments {
/**
* Constructor.
- *
+ *
* @param hostArray maps a bucket number (i.e., array index) to a host. All values
* must be non-null
*/
@@ -81,7 +81,7 @@ public class BucketAssignments {
/**
* Gets the leader, which is the host with the minimum UUID.
- *
+ *
* @return the assignment leader
*/
public String getLeader() {
@@ -103,7 +103,7 @@ public class BucketAssignments {
/**
* Determines if a host has an assignment.
- *
+ *
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
@@ -123,7 +123,7 @@ public class BucketAssignments {
/**
* Gets all of the hosts that have an assignment.
- *
+ *
* @return all of the hosts that have an assignment
*/
public Set<String> getAllHosts() {
@@ -143,7 +143,7 @@ public class BucketAssignments {
/**
* Gets the host assigned to a given bucket.
- *
+ *
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
@@ -153,12 +153,12 @@ public class BucketAssignments {
return null;
}
- return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+ return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
}
/**
* Gets the number of buckets.
- *
+ *
* @return the number of buckets
*/
public int size() {
@@ -168,7 +168,7 @@ public class BucketAssignments {
/**
* Checks the validity of the assignments, verifying that all buckets have been
* assigned to a host.
- *
+ *
* @throws PoolingFeatureException if the assignments are invalid
*/
public void checkValidity() throws PoolingFeatureException {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
deleted file mode 100644
index 3c34e971..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
- /**
- * Number of hops (i.e., number of times it's been forwarded) so far.
- */
- private int numHops;
-
- /**
- * Time, in milliseconds, at which the message was created.
- */
- private long createTimeMs;
-
- /**
- * Protocol of the receiving topic.
- */
- private CommInfrastructure protocol;
-
- /**
- * Topic on which the event was received.
- */
- private String topic;
-
- /**
- * The event pay load that was received on the topic.
- */
- private String payload;
-
- /**
- * The request id that was extracted from the event.
- */
- private String requestId;
-
- /**
- * Constructor.
- */
- public Forward() {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param source host on which the message originated
- * @param protocol protocol
- * @param topic topic
- * @param payload the actual event data received on the topic
- * @param requestId request id
- */
- public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
- super(source);
-
- this.numHops = 0;
- this.createTimeMs = System.currentTimeMillis();
- this.protocol = protocol;
- this.topic = topic;
- this.payload = payload;
- this.requestId = requestId;
- }
-
- /**
- * Increments {@link #numHops}.
- */
- public void bumpNumHops() {
- ++numHops;
- }
-
- public int getNumHops() {
- return numHops;
- }
-
- public void setNumHops(int numHops) {
- this.numHops = numHops;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public void setCreateTimeMs(long createTimeMs) {
- this.createTimeMs = createTimeMs;
- }
-
- public CommInfrastructure getProtocol() {
- return protocol;
- }
-
- public void setProtocol(CommInfrastructure protocol) {
- this.protocol = protocol;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public boolean isExpired(long minCreateTimeMs) {
- return (createTimeMs < minCreateTimeMs);
-
- }
-
- @Override
- public void checkValidity() throws PoolingFeatureException {
-
- super.checkValidity();
-
- if (protocol == null) {
- throw new PoolingFeatureException("missing message protocol");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new PoolingFeatureException("missing message topic");
- }
-
- /*
- * Note: an empty pay load is OK, as an empty message could have been
- * received on the topic.
- */
-
- if (payload == null) {
- throw new PoolingFeatureException("missing message payload");
- }
-
- if (requestId == null || requestId.isEmpty()) {
- throw new PoolingFeatureException("missing message requestId");
- }
-
- if (numHops < 0) {
- throw new PoolingFeatureException("invalid message hop count");
- }
- }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
deleted file mode 100644
index 5b7012d2..00000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/FilterUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
- // message element names
- public static final String MSG_CHANNEL = "channel";
- public static final String MSG_TIMESTAMP = "timestampMs";
-
- // json element names
- protected static final String JSON_CLASS = "class";
- protected static final String JSON_FILTERS = "filters";
- protected static final String JSON_FIELD = "field";
- protected static final String JSON_VALUE = "value";
-
- // values to be stuck into the "class" element
- protected static final String CLASS_OR = "Or";
- protected static final String CLASS_AND = "And";
- protected static final String CLASS_EQUALS = "Equals";
-
- /**
- * Constructor.
- */
- private FilterUtils() {
- super();
- }
-
- /**
- * Makes a filter that verifies that a field equals a value.
- *
- * @param field name of the field to check
- * @param value desired value
- * @return a map representing an "equals" filter
- */
- public static Map<String, Object> makeEquals(String field, String value) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_EQUALS);
- map.put(JSON_FIELD, field);
- map.put(JSON_VALUE, value);
-
- return map;
- }
-
- /**
- * Makes an "and" filter, where all of the items must be true.
- *
- * @param items items to be checked
- * @return an "and" filter
- */
- public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_AND);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-
- /**
- * Makes an "or" filter, where at least one of the items must be true.
- *
- * @param items items to be checked
- * @return an "or" filter
- */
- public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
- Map<String, Object> map = new TreeMap<>();
- map.put(JSON_CLASS, CLASS_OR);
- map.put(JSON_FILTERS, items);
-
- return map;
- }
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
index 59d264ec..c5761bfd 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/StartState.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,16 +20,8 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +41,7 @@ public class StartState extends State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public StartState(PoolingManager mgr) {
@@ -58,7 +50,7 @@ public class StartState extends State {
/**
* Get Heart beat time stamp in milliseconds.
- *
+ *
* @return the time stamp inserted into the heart beat message
*/
public long getHbTimestampMs() {
@@ -110,14 +102,4 @@ public class StartState extends State {
return null;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, Object> getFilter() {
- // ignore everything except our most recent heart beat message
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
- makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
- }
-
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index f2277be3..853c24d4 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -20,22 +20,15 @@
package org.onap.policy.drools.pooling.state;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.onap.policy.drools.pooling.CancellableScheduledTask;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
@@ -71,18 +64,6 @@ public abstract class State {
}
/**
- * Gets the server-side filter to use when polling the DMaaP internal topic. The
- * default method returns a filter that accepts messages on the admin channel and on
- * the host's own channel.
- *
- * @return the server-side filter to use.
- */
- @SuppressWarnings("unchecked")
- public Map<String, Object> getFilter() {
- return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
- }
-
- /**
* Cancels the timers added by this state.
*/
public final void cancelTimers() {
@@ -142,26 +123,6 @@ public abstract class State {
}
/**
- * Processes a message. The default method passes it to the manager to handle and
- * returns {@code null}.
- *
- * @param msg message to be processed
- * @return the new state, or {@code null} if the state is unchanged
- */
- public State process(Forward msg) {
- if (getHost().equals(msg.getChannel())) {
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
-
- } else {
- logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
- getTopic());
- }
-
- return null;
- }
-
- /**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
@@ -293,16 +254,6 @@ public abstract class State {
* @param channel channel
* @param msg message to be published
*/
- protected final void publish(String channel, Forward msg) {
- mgr.publish(channel, msg);
- }
-
- /**
- * Publishes a message on the specified channel.
- *
- * @param channel channel
- * @param msg message to be published
- */
protected final void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}