diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main')
-rw-r--r-- | feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties | 89 | ||||
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java | 63 | ||||
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java | 67 | ||||
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java | 44 | ||||
-rw-r--r-- | feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI (renamed from feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI) | 0 |
5 files changed, 153 insertions, 110 deletions
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties new file mode 100644 index 00000000..64a6b063 --- /dev/null +++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties @@ -0,0 +1,89 @@ +### +# ============LICENSE_START======================================================= +# feature-pooling-dmaap +# ================================================================================ +# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +# In general, the feature-specific properties begin with "pooling", +# and they may be made specific to a controller by prepending with +# "pooling.<controller-name>", instead. +# +# The available properties and their default values are shown below. + +# Whether or not the feature is enabled. +#pooling.enabled=false + +# The internal DMaaP topic used by a controller. Note: the controller +# name is required for this property. +#pooling.<controller-name>.topic = + +# Maximum number of events to retain in the queue while a new host waits +# to be assigned work. +#pooling.offline.queue.limit=1000 + +# Maximum age, in milliseconds, of events to be retained in the queue. +# Events older than this are discarded. +#pooling.offline.queue.age.milliseconds=60000 + +# Time, in milliseconds, to wait for an "Offline" message to be published +# to DMaaP before the connection may be closed. +#pooling.offline.publish.wait.milliseconds=3000 + +# Time, in milliseconds, to wait for this host's initial heart beat. This +# is used to verify connectivity to the internal DMaaP topic. +#pooling.start.heartbeat.milliseconds=100000 + +# Time, in milliseconds, to wait before attempting to re-active this +# host when it was not assigned any work. +#pooling.reactivate.milliseconds=50000 + +# Time, in milliseconds, to wait for other hosts to identify themselves +# when this host is started. +#pooling.identification.milliseconds=50000 + +# Time, in milliseconds, to wait for heart beats from this host, or its +# predecessor, during the active state. +#pooling.active.heartbeat.milliseconds=50000 + +# Time, in milliseconds, to wait between heart beat generations. +#pooling.inter.heartbeat.milliseconds=15000 + +# Topic used for inter-host communication for a particular controller +# pooling.<controller-name>.topic=XXX + +# These specify how the request id is to be extracted from each type of +# object that may be presented to a controller from distributed topics +# (i.e., topics whose events are to be distributed among multiple hosts) +extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId} +extractor.requestId.org.onap.policy.appc.Response=${commonHeader.requestId} +extractor.requestId.org.onap.policy.appclcm.LcmResponseWrapper=${body.commonHeader.requestId} + + +pooling.amsterdam.enabled=true +pooling.amsterdam.topic=${{AMSTERDAM_POOLING_TOPIC}} + +# the list of sources and sinks should be identical +ueb.source.topics=${{AMSTERDAM_POOLING_TOPIC}} +ueb.sink.topics=${{AMSTERDAM_POOLING_TOPIC}} + +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}} +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey= +ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret= + +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}} +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey= +ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret= diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java index abb4da33..eb41f803 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java @@ -21,7 +21,6 @@ package org.onap.policy.drools.pooling; import java.util.List; -import java.util.Properties; import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicEndpoint; import org.onap.policy.drools.event.comm.TopicListener; @@ -31,7 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Manages the internal DMaaP topic. + * Manages the internal DMaaP topic. Assumes all topics are managed by + * {@link TopicEndpoint#manager}. */ public class DmaapManager { @@ -58,18 +58,6 @@ public class DmaapManager { private final TopicSink topicSink; /** - * Topic sources. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSource> sources; - - /** - * Topic sinks. In theory, there's only one item in this list, the internal DMaaP - * topic. - */ - private final List<TopicSink> sinks; - - /** * {@code True} if the consumer is running, {@code false} otherwise. */ private boolean consuming = false; @@ -83,17 +71,14 @@ public class DmaapManager { * Constructs the manager, but does not start the source or sink. * * @param topic name of the internal DMaaP topic - * @param props properties to configure the topic source & sink * @throws PoolingFeatureException if an error occurs */ - public DmaapManager(String topic, Properties props) throws PoolingFeatureException { + public DmaapManager(String topic) throws PoolingFeatureException { logger.info("initializing bus for topic {}", topic); try { this.topic = topic; - this.sources = factory.initTopicSources(props); - this.sinks = factory.initTopicSinks(props); this.topicSource = findTopicSource(); this.topicSink = findTopicSink(); @@ -132,7 +117,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the source doesn't exist or is not filterable */ private FilterableTopicSource findTopicSource() throws PoolingFeatureException { - for (TopicSource src : sources) { + for (TopicSource src : factory.getTopicSources()) { if (topic.equals(src.getTopic())) { if (src instanceof FilterableTopicSource) { return (FilterableTopicSource) src; @@ -153,7 +138,7 @@ public class DmaapManager { * @throws PoolingFeatureException if the sink doesn't exist */ private TopicSink findTopicSink() throws PoolingFeatureException { - for (TopicSink sink : sinks) { + for (TopicSink sink : factory.getTopicSinks()) { if (topic.equals(sink.getTopic())) { return sink; } @@ -164,22 +149,14 @@ public class DmaapManager { /** * Starts the publisher, if it isn't already running. - * - * @throws PoolingFeatureException if an error occurs */ - public void startPublisher() throws PoolingFeatureException { + public void startPublisher() { if (publishing) { return; } - try { - logger.info("start publishing to topic {}", topic); - topicSink.start(); - publishing = true; - - } catch (IllegalStateException e) { - throw new PoolingFeatureException("cannot start topic sink " + topic, e); - } + logger.info("start publishing to topic {}", topic); + publishing = true; } /** @@ -205,14 +182,8 @@ public class DmaapManager { Thread.currentThread().interrupt(); } - try { - logger.info("stop publishing to topic {}", topic); - publishing = false; - topicSink.stop(); - - } catch (IllegalStateException e) { - logger.error("cannot stop sink for topic {}", topic, e); - } + logger.info("stop publishing to topic {}", topic); + publishing = false; } /** @@ -288,23 +259,17 @@ public class DmaapManager { public static class Factory { /** - * Initializes the topic sources. - * - * @param props properties used to configure the topics * @return the topic sources */ - public List<TopicSource> initTopicSources(Properties props) { - return TopicEndpoint.manager.addTopicSources(props); + public List<TopicSource> getTopicSources() { + return TopicEndpoint.manager.getTopicSources(); } /** - * Initializes the topic sinks. - * - * @param props properties used to configure the topics * @return the topic sinks */ - public List<TopicSink> initTopicSinks(Properties props) { - return TopicEndpoint.manager.addTopicSinks(props); + public List<TopicSink> getTopicSinks() { + return TopicEndpoint.manager.getTopicSinks(); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 1e2071ab..67cb21ee 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -20,13 +20,16 @@ package org.onap.policy.drools.pooling; +import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.event.comm.TopicEndpoint; +import org.onap.policy.drools.event.comm.TopicSink; +import org.onap.policy.drools.event.comm.TopicSource; import org.onap.policy.drools.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; @@ -53,7 +56,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Factory used to create objects. */ - private static Factory factory; + private static Factory factory = new Factory(); /** * ID of this host. @@ -124,6 +127,10 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public boolean beforeStart(PolicyEngine engine) { logger.info("initializing " + PoolingProperties.FEATURE_NAME); featProps = factory.getProperties(PoolingProperties.FEATURE_NAME); + + factory.initTopicSources(featProps); + factory.initTopicSinks(featProps); + return false; } @@ -189,17 +196,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF @Override public boolean afterStop(PolicyController controller) { - - // NOTE: using doDeleteManager() instead of doManager() - - return doDeleteManager(controller, mgr -> { - + return doManager(controller, mgr -> { mgr.afterStop(); return false; }); } @Override + public boolean afterShutdown(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override + public boolean afterHalt(PolicyController controller) { + deleteManager(controller); + return false; + } + + @Override public boolean beforeLock(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeLock(); @@ -306,29 +321,17 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF } /** - * Executes a function using the manager associated with the controller and then - * deletes the manager. Catches any exceptions from the function and re-throws it as a - * runtime exception. + * Deletes the manager associated with a controller. * * @param controller - * @param func function to be executed - * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ - private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) { + private void deleteManager(PolicyController controller) { String name = controller.getName(); logger.info("remove feature-pool-dmaap manager for {}", name); - // NOTE: using "remove()" instead of "get()" - - PoolingManagerImpl mgr = ctlr2pool.remove(name); - - if (mgr == null) { - return false; - } - - return func.apply(mgr); + ctlr2pool.remove(name); } /** @@ -416,5 +419,25 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF public PolicyController getController(DroolsController droolsController) { return PolicyController.factory.get(droolsController); } + + /** + * Initializes the topic sources. + * + * @param props properties used to configure the topics + * @return the topic sources + */ + public List<TopicSource> initTopicSources(Properties props) { + return TopicEndpoint.manager.addTopicSources(props); + } + + /** + * Initializes the topic sinks. + * + * @param props properties used to configure the topics + * @return the topic sinks + */ + public List<TopicSink> initTopicSinks(Properties props) { + return TopicEndpoint.manager.addTopicSinks(props); + } } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index de25e471..33f45085 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -45,7 +45,6 @@ import org.onap.policy.drools.pooling.state.QueryState; import org.onap.policy.drools.pooling.state.StartState; import org.onap.policy.drools.pooling.state.State; import org.onap.policy.drools.pooling.state.StateTimerTask; -import org.onap.policy.drools.properties.PolicyProperties; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.system.PolicyController; import org.slf4j.Logger; @@ -177,8 +176,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { this.topic = props.getPoolingTopic(); this.eventq = factory.makeEventQueue(props); this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource())); - this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(), - makeDmaapProps(controller, props.getSource())); + this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic()); this.current = new IdleState(this); logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic); @@ -237,39 +235,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Makes properties for configuring DMaaP. Copies properties from the source that - * start with the Pooling property prefix followed by the controller name, stripping - * the prefix and controller name. - * - * @param controller the controller for which DMaaP will be configured - * @param source properties from which to get the DMaaP properties - * @return DMaaP properties - */ - private Properties makeDmaapProps(PolicyController controller, Properties source) { - SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); - - // could be UEB or DMAAP, so add both - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - - return specProps; - } - - /** - * Adds DMaaP consumer properties, consumer group & instance. The group is the host - * and the instance is a constant. - * - * @param props where to add the new properties - * @param prefix property prefix - */ - private void addDmaapConsumerProps(SpecProperties props, String prefix) { - String fullpfx = props.getSpecPrefix() + prefix + "." + topic; - - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); - props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); - } - - /** * Indicates that the controller is about to start. Starts the publisher for the * internal topic, and creates a thread pool for the timers. * @@ -325,6 +290,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { dmaapMgr.stopConsumer(this); publishAdmin(new Offline(getHost())); } + + assignments = null; } if (sched != null) { @@ -902,12 +869,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * Creates a DMaaP manager. * * @param topic name of the internal DMaaP topic - * @param props properties used to configure DMaaP * @return a new DMaaP manager * @throws PoolingFeatureException if an error occurs */ - public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException { - return new DmaapManager(topic, props); + public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException { + return new DmaapManager(topic); } /** diff --git a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI index cd59e469..cd59e469 100644 --- a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI +++ b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI |