summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java108
1 files changed, 67 insertions, 41 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 422efdd7..d2312469 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicListener;
@@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState;
import org.onap.policy.drools.pooling.state.StartState;
import org.onap.policy.drools.pooling.state.State;
import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.system.PolicyController;
import org.slf4j.Logger;
@@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private static Factory factory = new Factory();
/**
- * ID of the last host that was created.
- */
- private static final AtomicReference<String> lastHost = new AtomicReference<>(null);
-
- /**
* ID of this host.
*/
private final String host;
@@ -158,28 +152,24 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Constructs the manager, initializing all of the data structures.
- *
+ *
+ * @param host name/uuid of this host
* @param controller controller with which this is associated
* @param props feature properties specific to the controller
*/
- public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
- this.host = UUID.randomUUID().toString();
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+ this.host = host;
this.controller = controller;
this.props = props;
- lastHost.set(this.host);
-
try {
this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
this.eventq = factory.makeEventQueue(props);
-
- SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(),
- props.getSource());
- this.extractors = factory.makeClassExtractors(spec);
-
- this.dmaapMgr = factory.makeDmaapManager(props);
+ this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
+ this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
+ makeDmaapProps(controller, props.getSource()));
this.current = new IdleState(this);
logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -203,16 +193,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Used by junit tests.
- *
- * @return the ID of the last host that was created, or {@code null} if no hosts have
- * been created yet
- */
- protected static String getLastHost() {
- return lastHost.get();
- }
-
- /**
* Should only be used by junit tests.
*
* @return the current state
@@ -237,6 +217,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
+ * Makes properties for configuring extractors.
+ *
+ * @param controller the controller for which the extractors will be configured
+ * @param source properties from which to get the extractor properties
+ * @return extractor properties
+ */
+ private Properties makeExtractorProps(PolicyController controller, Properties source) {
+ return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
+ }
+
+ /**
+ * Makes properties for configuring DMaaP. Copies properties from the source that
+ * start with the Pooling property prefix followed by the controller name, stripping
+ * the prefix and controller name.
+ *
+ * @param controller the controller for which DMaaP will be configured
+ * @param source properties from which to get the DMaaP properties
+ * @return DMaaP properties
+ */
+ private Properties makeDmaapProps(PolicyController controller, Properties source) {
+ SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+
+ // could be UEB or DMAAP, so add both
+ addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+ addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+
+ return props;
+ }
+
+ /**
+ * Adds DMaaP consumer properties, consumer group & instance. The group is the host
+ * and the instance is a constant.
+ *
+ * @param props where to add the new properties
+ * @param prefix property prefix
+ */
+ private void addDmaapConsumerProps(SpecProperties props, String prefix) {
+ String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
+
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
+ props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
+ }
+
+ /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*
@@ -288,8 +312,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
scheduler = null;
if (!(current instanceof IdleState)) {
- dmaapMgr.stopConsumer(this);
changeState(new IdleState(this));
+ dmaapMgr.stopConsumer(this);
publishAdmin(new Offline(getHost()));
}
}
@@ -751,26 +775,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public CountDownLatch startDistributing(BucketAssignments asgn) {
- if (asgn == null) {
- return null;
- }
-
- logger.info("new assignments for topic {}", getTopic());
-
synchronized (curLocker) {
+ int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
+ logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
assignments = asgn;
}
+ if (asgn == null) {
+ return null;
+ }
+
/*
* publish the events from the event queue, but do it in a background thread so
* that the state machine can enter its correct state BEFORE we start processing
* the events
*/
CountDownLatch latch = new CountDownLatch(1);
-
+
new Thread(() -> {
synchronized (curLocker) {
if (assignments == null) {
+ latch.countDown();
return;
}
@@ -779,11 +804,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
while ((ev = eventq.poll()) != null) {
handle(ev);
}
-
+
latch.countDown();
}
}).start();
-
+
return latch;
}
@@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Creates a DMaaP manager.
*
- * @param props properties used to configure the manager
+ * @param topic name of the internal DMaaP topic
+ * @param props properties used to configure DMaaP
* @return a new DMaaP manager
* @throws PoolingFeatureException if an error occurs
*/
- public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
- return new DmaapManager(props.getPoolingTopic(), props.getSource());
+ public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
+ return new DmaapManager(topic, props);
}
/**