From 3b6bb8e880d3b56afb7767ea7d0505ceb49f8890 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Thu, 3 May 2018 14:47:23 -0400 Subject: Fix various problems in pooling Renamed META-INF SessionAPI to EngineAPI, as it implements the latter. Created default PoolingFeature.factory object. Don't delete a controller's pooling manager when stop is called; do that in afterHalt and afterShutdown. This enables it to be restarted as long as the controller still exists. Only stop & start the internal DMaaP topic at the engine level instead of the controller level. This is necessary to prevent sinks for ALL controllers from being started each time an individual controller starts. Clear all bucket assignments when controller is stopped. Mark test methods with @Override annotation. Add default property file for pooling feature. Add license to default property file. Remove tests for doDeleteManager(), as it no longer exists. Changed " = " to "=" in the property file. Change-Id: I80c0c3f1879b5a320044db93e3dfa3b7281cda51 Issue-ID: POLICY-774 Signed-off-by: Jim Hahn --- .../onap/policy/drools/pooling/DmaapManager.java | 63 +++++--------------- .../onap/policy/drools/pooling/PoolingFeature.java | 67 +++++++++++++++------- .../policy/drools/pooling/PoolingManagerImpl.java | 44 ++------------ 3 files changed, 64 insertions(+), 110 deletions(-) (limited to 'feature-pooling-dmaap/src/main/java') diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index abb4da33..eb41f803 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.List; -import java.util.Properties; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicEndpoint; import org.onap.policy.drools.event.comm.TopicListener; @@ -31,7 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Manages the internal DMaaP topic. + * Manages the internal DMaaP topic. Assumes all topics are managed by + * {@link TopicEndpoint#manager}. */ public class DmaapManager { @@ -57,18 +57,6 @@ public class DmaapManager { */ private final TopicSink topicSink; - /** - * Topic sources. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List sources; - - /** - * Topic sinks. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List sinks; - /** * {@code True} if the consumer is running, {@code false} otherwise. */ @@ -83,17 +71,14 @@ public class DmaapManager { * Constructs the manager, but does not start the source or sink. * * @param topic name of the internal DMaaP topic - * @param props properties to configure the topic source & sink * @throws PoolingFeatureException if an error occurs */ - public DmaapManager(String topic, Properties props) throws PoolingFeatureException { + public DmaapManager(String topic) throws PoolingFeatureException { logger.info("initializing bus for topic {}", topic); try { this.topic = topic; - this.sources = factory.initTopicSources(props); - this.sinks = factory.initTopicSinks(props); this.topicSource = findTopicSource(); this.topicSink = findTopicSink(); @@ -132,7 +117,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { - for (TopicSource src : sources) { + for (TopicSource src : factory.getTopicSources()) { if (topic.equals(src.getTopic())) { if (src instanceof FilterableTopicSource) { return (FilterableTopicSource) src; @@ -153,7 +138,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the sink doesn't exist */ private TopicSink findTopicSink() throws PoolingFeatureException { - for (TopicSink sink : sinks) { + for (TopicSink sink : factory.getTopicSinks()) { if (topic.equals(sink.getTopic())) { return sink; } @@ -164,22 +149,14 @@ public class DmaapManager { /** * Starts the publisher, if it isn't already running. - * - * @throws PoolingFeatureException if an error occurs */ - public void startPublisher() throws PoolingFeatureException { + public void startPublisher() { if (publishing) { return; } - try { - logger.info("start publishing to topic {}", topic); - topicSink.start(); - publishing = true; - - } catch (IllegalStateException e) { - throw new PoolingFeatureException("cannot start topic sink " + topic, e); - } + logger.info("start publishing to topic {}", topic); + publishing = true; } /** @@ -205,14 +182,8 @@ public class DmaapManager { Thread.currentThread().interrupt(); } - try { - logger.info("stop publishing to topic {}", topic); - publishing = false; - topicSink.stop(); - - } catch (IllegalStateException e) { - logger.error("cannot stop sink for topic {}", topic, e); - } + logger.info("stop publishing to topic {}", topic); + publishing = false; } /** @@ -288,23 +259,17 @@ public class DmaapManager { public static class Factory { /** - * Initializes the topic sources. - * - * @param props properties used to configure the topics * @return the topic sources */ - public List initTopicSources(Properties props) { - return TopicEndpoint.manager.addTopicSources(props); + public List getTopicSources() { + return TopicEndpoint.manager.getTopicSources(); } /** - * Initializes the topic sinks. - * - * @param props properties used to configure the topics * @return the topic sinks */ - public List initTopicSinks(Properties props) { - return TopicEndpoint.manager.addTopicSinks(props); + public List getTopicSinks() { + return TopicEndpoint.manager.getTopicSinks(); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 1e2071ab..67cb21ee 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -20,13 +20,16 @@ package org.onap.policy.drools.pooling; +import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.TopicEndpoint; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; @@ -53,7 +56,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Factory used to create objects. */ - private static Factory factory; + private static Factory factory = new Factory(); /** * ID of this host. @@ -124,6 +127,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public boolean beforeStart(PolicyEngine engine) { logger.info("initializing " + PoolingProperties.FEATURE_NAME); featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); + + factory.initTopicSources(featProps); + factory.initTopicSinks(featProps); + return false; } @@ -189,16 +196,24 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF @Override public boolean afterStop(PolicyController controller) { - - // NOTE: using doDeleteManager() instead of doManager() - - return doDeleteManager(controller, mgr -> { - + return doManager(controller, mgr -> { mgr.afterStop(); return false; }); } + @Override + public boolean afterShutdown(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override + public boolean afterHalt(PolicyController controller) { + deleteManager(controller); + return false; + } + @Override public boolean beforeLock(PolicyController controller) { return doManager(controller, mgr -> { @@ -306,29 +321,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF } /** - * Executes a function using the manager associated with the controller and then - * deletes the manager. Catches any exceptions from the function and re-throws it as a - * runtime exception. + * Deletes the manager associated with a controller. * * @param controller - * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ - private boolean doDeleteManager(PolicyController controller, Function func) { + private void deleteManager(PolicyController controller) { String name = controller.getName(); logger.info("remove feature-pool-dmaap manager for {}", name); - // NOTE: using "remove()" instead of "get()" - - PoolingManagerImpl mgr = ctlr2pool.remove(name); - - if (mgr == null) { - return false; - } - - return func.apply(mgr); + ctlr2pool.remove(name); } /** @@ -416,5 +419,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public PolicyController getController(DroolsController droolsController) { return PolicyController.factory.get(droolsController); } + + /** + * Initializes the topic sources. + * + * @param props properties used to configure the topics + * @return the topic sources + */ + public List initTopicSources(Properties props) { + return TopicEndpoint.manager.addTopicSources(props); + } + + /** + * Initializes the topic sinks. + * + * @param props properties used to configure the topics + * @return the topic sinks + */ + public List initTopicSinks(Properties props) { + return TopicEndpoint.manager.addTopicSinks(props); + } } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index de25e471..33f45085 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -45,7 +45,6 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; -import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -177,8 +176,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); - this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), - makeDmaapProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -236,39 +234,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source); } - /** - * Makes properties for configuring DMaaP. Copies properties from the source that - * start with the Pooling property prefix followed by the controller name, stripping - * the prefix and controller name. - * - * @param controller the controller for which DMaaP will be configured - * @param source properties from which to get the DMaaP properties - * @return DMaaP properties - */ - private Properties makeDmaapProps(PolicyController controller, Properties source) { - SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); - - // could be UEB or DMAAP, so add both - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - - return specProps; - } - - /** - * Adds DMaaP consumer properties, consumer group & instance. The group is the host - * and the instance is a constant. - * - * @param props where to add the new properties - * @param prefix property prefix - */ - private void addDmaapConsumerProps(SpecProperties props, String prefix) { - String fullpfx = props.getSpecPrefix() + prefix + "." + topic; - - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); - } - /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. @@ -325,6 +290,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } + + assignments = null; } if (sched != null) { @@ -902,12 +869,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Creates a DMaaP manager. * * @param topic name of the internal DMaaP topic - * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { - return new DmaapManager(topic, props); + public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException { + return new DmaapManager(topic); } /** -- cgit 1.2.3-korg