diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java | 108 |
1 files changed, 67 insertions, 41 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 422efdd7..d2312469 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -25,12 +25,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; import java.util.Properties; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.event.comm.TopicListener; @@ -47,6 +45,7 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; +import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -75,11 +74,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private static Factory factory = new Factory(); /** - * ID of the last host that was created. - */ - private static final AtomicReference<String> lastHost = new AtomicReference<>(null); - - /** * ID of this host. */ private final String host; @@ -158,28 +152,24 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Constructs the manager, initializing all of the data structures. - * + * + * @param host name/uuid of this host * @param controller controller with which this is associated * @param props feature properties specific to the controller */ - public PoolingManagerImpl(PolicyController controller, PoolingProperties props) { - this.host = UUID.randomUUID().toString(); + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) { + this.host = host; this.controller = controller; this.props = props; - lastHost.set(this.host); - try { this.listener = (TopicListener) controller; this.serializer = new Serializer(); this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); - - SpecProperties spec = new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), - props.getSource()); - this.extractors = factory.makeClassExtractors(spec); - - this.dmaapMgr = factory.makeDmaapManager(props); + this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), + makeDmaapProps(controller, props.getSource())); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -203,16 +193,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Used by junit tests. - * - * @return the ID of the last host that was created, or {@code null} if no hosts have - * been created yet - */ - protected static String getLastHost() { - return lastHost.get(); - } - - /** * Should only be used by junit tests. * * @return the current state @@ -237,6 +217,50 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** + * Makes properties for configuring extractors. + * + * @param controller the controller for which the extractors will be configured + * @param source properties from which to get the extractor properties + * @return extractor properties + */ + private Properties makeExtractorProps(PolicyController controller, Properties source) { + return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source); + } + + /** + * Makes properties for configuring DMaaP. Copies properties from the source that + * start with the Pooling property prefix followed by the controller name, stripping + * the prefix and controller name. + * + * @param controller the controller for which DMaaP will be configured + * @param source properties from which to get the DMaaP properties + * @return DMaaP properties + */ + private Properties makeDmaapProps(PolicyController controller, Properties source) { + SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source); + + // could be UEB or DMAAP, so add both + addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + + return props; + } + + /** + * Adds DMaaP consumer properties, consumer group & instance. The group is the host + * and the instance is a constant. + * + * @param props where to add the new properties + * @param prefix property prefix + */ + private void addDmaapConsumerProps(SpecProperties props, String prefix) { + String fullpfx = props.getSpecPrefix() + prefix + "." + topic; + + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); + } + + /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. * @@ -288,8 +312,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { scheduler = null; if (!(current instanceof IdleState)) { - dmaapMgr.stopConsumer(this); changeState(new IdleState(this)); + dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } } @@ -751,26 +775,27 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public CountDownLatch startDistributing(BucketAssignments asgn) { - if (asgn == null) { - return null; - } - - logger.info("new assignments for topic {}", getTopic()); - synchronized (curLocker) { + int sz = (asgn == null ? 0 : asgn.getAllHosts().size()); + logger.info("new assignments for {} hosts on topic {}", sz, getTopic()); assignments = asgn; } + if (asgn == null) { + return null; + } + /* * publish the events from the event queue, but do it in a background thread so * that the state machine can enter its correct state BEFORE we start processing * the events */ CountDownLatch latch = new CountDownLatch(1); - + new Thread(() -> { synchronized (curLocker) { if (assignments == null) { + latch.countDown(); return; } @@ -779,11 +804,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { while ((ev = eventq.poll()) != null) { handle(ev); } - + latch.countDown(); } }).start(); - + return latch; } @@ -876,12 +901,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Creates a DMaaP manager. * - * @param props properties used to configure the manager + * @param topic name of the internal DMaaP topic + * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException { - return new DmaapManager(props.getPoolingTopic(), props.getSource()); + public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { + return new DmaapManager(topic, props); } /** |