diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main')
5 files changed, 43 insertions, 28 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index 2bec4579..1e2071ab 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; @@ -67,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF /** * Maps a controller name to its associated manager. */ - private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107); + + /** + * Decremented each time a manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch = new CountDownLatch(1); /** * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is @@ -102,6 +108,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF return host; } + /** + * @return a latch that will be decremented when a manager enters the active state + */ + protected CountDownLatch getActiveLatch() { + return activeLatch; + } + @Override public int getSequenceNumber() { return 0; @@ -135,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF PoolingProperties props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); - ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props)); + ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name, e); @@ -386,10 +399,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF * @param host name/uuid of this host * @param controller * @param props properties to use to configure the manager + * @param activeLatch decremented when the manager goes Active * @return a new pooling manager */ - public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) { - return new PoolingManagerImpl(host, controller, props); + public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { + return new PoolingManagerImpl(host, controller, props, activeLatch); } /** diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index d2312469..de25e471 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -94,6 +94,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { private final TopicListener listener; /** + * Decremented each time the manager enters the Active state. Used by junit tests. + */ + private final CountDownLatch activeLatch; + + /** * Used to encode & decode request objects received from & sent to a rule engine. */ private final Serializer serializer; @@ -142,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { /** * Queue used when no bucket assignments are available. */ - private EventQueue eventq; + private final EventQueue eventq; /** * {@code True} if events offered by the controller should be intercepted, @@ -156,11 +161,15 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param host name/uuid of this host * @param controller controller with which this is associated * @param props feature properties specific to the controller + * @param activeLatch latch to be decremented each time the manager enters the Active + * state */ - public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) { + public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props, + CountDownLatch activeLatch) { this.host = host; this.controller = controller; this.props = props; + this.activeLatch = activeLatch; try { this.listener = (TopicListener) controller; @@ -237,13 +246,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @return DMaaP properties */ private Properties makeDmaapProps(PolicyController controller, Properties source) { - SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source); + SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source); // could be UEB or DMAAP, so add both - addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); - addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS); + addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - return props; + return specProps; } /** @@ -255,7 +264,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { */ private void addDmaapConsumerProps(SpecProperties props, String prefix) { String fullpfx = props.getSpecPrefix() + prefix + "." + topic; - + props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host); props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0"); } @@ -429,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -444,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { TimeUnit.MILLISECONDS); // wrap the future in a "CancellableScheduledTask" - return new CancellableScheduledTask() { - @Override - public void cancel() { - fut.cancel(false); - } - }; + return () -> fut.cancel(false); } @Override @@ -633,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { topic); } else { - logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); + logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic()); event.bumpNumHops(); publish(target, event); } @@ -829,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { @Override public State goActive() { + activeLatch.countDown(); return new ActiveState(this); } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java index b62ea0a7..c831f706 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java @@ -107,11 +107,11 @@ public class SpecProperties extends Properties { @Override public final int hashCode() { - throw new UnsupportedOperationException("HostBucket cannot be hashed"); + throw new UnsupportedOperationException("SpecProperties cannot be hashed"); } @Override public final boolean equals(Object obj) { - throw new UnsupportedOperationException("cannot compare HostBuckets"); + throw new UnsupportedOperationException("cannot compare SpecProperties"); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java index da044252..f717aa52 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java @@ -45,7 +45,7 @@ public class InactiveState extends State { @Override public void start() { super.start(); - schedule(getProperties().getReactivateMs(), () -> goStart()); + schedule(getProperties().getReactivateMs(), this::goStart); } @Override diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 54e93230..545c2ef0 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -122,7 +122,7 @@ public abstract class State { protected State goActive(BucketAssignments asgn) { startDistributing(asgn); - if (asgn.hasAssignment(getHost())) { + if (asgn != null && asgn.hasAssignment(getHost())) { return mgr.goActive(); } else { |