aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java23
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java40
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java2
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java2
5 files changed, 43 insertions, 28 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 2bec4579..1e2071ab 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -23,6 +23,7 @@ package org.onap.policy.drools.pooling;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
@@ -67,7 +68,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
* Maps a controller name to its associated manager.
*/
- private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+ private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+ /**
+ * Decremented each time a manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
* Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
@@ -102,6 +108,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
return host;
}
+ /**
+ * @return a latch that will be decremented when a manager enters the active state
+ */
+ protected CountDownLatch getActiveLatch() {
+ return activeLatch;
+ }
+
@Override
public int getSequenceNumber() {
return 0;
@@ -135,7 +148,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
PoolingProperties props = new PoolingProperties(name, featProps);
logger.info("pooling enabled for {}", name);
- ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
+ ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
} catch (PropertyException e) {
logger.error("pooling disabled due to exception for {}", name, e);
@@ -386,10 +399,12 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @param host name/uuid of this host
* @param controller
* @param props properties to use to configure the manager
+ * @param activeLatch decremented when the manager goes Active
* @return a new pooling manager
*/
- public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
- return new PoolingManagerImpl(host, controller, props);
+ public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
+ return new PoolingManagerImpl(host, controller, props, activeLatch);
}
/**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index d2312469..de25e471 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -94,6 +94,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final TopicListener listener;
/**
+ * Decremented each time the manager enters the Active state. Used by junit tests.
+ */
+ private final CountDownLatch activeLatch;
+
+ /**
* Used to encode & decode request objects received from & sent to a rule engine.
*/
private final Serializer serializer;
@@ -142,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/**
* Queue used when no bucket assignments are available.
*/
- private EventQueue eventq;
+ private final EventQueue eventq;
/**
* {@code True} if events offered by the controller should be intercepted,
@@ -156,11 +161,15 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @param host name/uuid of this host
* @param controller controller with which this is associated
* @param props feature properties specific to the controller
+ * @param activeLatch latch to be decremented each time the manager enters the Active
+ * state
*/
- public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+ public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+ CountDownLatch activeLatch) {
this.host = host;
this.controller = controller;
this.props = props;
+ this.activeLatch = activeLatch;
try {
this.listener = (TopicListener) controller;
@@ -237,13 +246,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* @return DMaaP properties
*/
private Properties makeDmaapProps(PolicyController controller, Properties source) {
- SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+ SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
// could be UEB or DMAAP, so add both
- addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
- addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+ addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
- return props;
+ return specProps;
}
/**
@@ -255,7 +264,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
*/
private void addDmaapConsumerProps(SpecProperties props, String prefix) {
String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-
+
props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
}
@@ -429,12 +438,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
@@ -444,12 +448,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
TimeUnit.MILLISECONDS);
// wrap the future in a "CancellableScheduledTask"
- return new CancellableScheduledTask() {
- @Override
- public void cancel() {
- fut.cancel(false);
- }
- };
+ return () -> fut.cancel(false);
}
@Override
@@ -633,7 +632,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
topic);
} else {
- logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
+ logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
event.bumpNumHops();
publish(target, event);
}
@@ -829,6 +828,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
@Override
public State goActive() {
+ activeLatch.countDown();
return new ActiveState(this);
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
index b62ea0a7..c831f706 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -107,11 +107,11 @@ public class SpecProperties extends Properties {
@Override
public final int hashCode() {
- throw new UnsupportedOperationException("HostBucket cannot be hashed");
+ throw new UnsupportedOperationException("SpecProperties cannot be hashed");
}
@Override
public final boolean equals(Object obj) {
- throw new UnsupportedOperationException("cannot compare HostBuckets");
+ throw new UnsupportedOperationException("cannot compare SpecProperties");
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index da044252..f717aa52 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -45,7 +45,7 @@ public class InactiveState extends State {
@Override
public void start() {
super.start();
- schedule(getProperties().getReactivateMs(), () -> goStart());
+ schedule(getProperties().getReactivateMs(), this::goStart);
}
@Override
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 54e93230..545c2ef0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -122,7 +122,7 @@ public abstract class State {
protected State goActive(BucketAssignments asgn) {
startDistributing(asgn);
- if (asgn.hasAssignment(getHost())) {
+ if (asgn != null && asgn.hasAssignment(getHost())) {
return mgr.goActive();
} else {