summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java221
1 files changed, 27 insertions, 194 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index b2966101..9093c7c0 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,19 +23,14 @@ package org.onap.policy.drools.pooling;
import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
@@ -83,11 +78,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final PolicyController controller;
/**
- * Where to offer events that have been forwarded to this host (i.e, the controller).
- */
- private final TopicListener listener;
-
- /**
* Decremented each time the manager enters the Active state. Used by junit tests.
*/
private final CountDownLatch activeLatch;
@@ -108,11 +98,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final DmaapManager dmaapMgr;
/**
- * Used to extract the request id from the decoded message.
- */
- private final ClassExtractors extractors;
-
- /**
* Lock used while updating {@link #current}. In general, public methods must use
* this, while private methods assume the lock is already held.
*/
@@ -139,12 +124,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private ScheduledThreadPoolExecutor scheduler = null;
/**
- * {@code True} if events offered by the controller should be intercepted,
- * {@code false} otherwise.
- */
- private boolean intercept = true;
-
- /**
* Constructs the manager, initializing all of the data structures.
*
* @param host name/uuid of this host
@@ -161,10 +140,8 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
this.activeLatch = activeLatch;
try {
- this.listener = (TopicListener) controller;
this.serializer = new Serializer();
this.topic = props.getPoolingTopic();
- this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
this.current = new IdleState(this);
@@ -207,17 +184,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes properties for configuring extractors.
- *
- * @param controller the controller for which the extractors will be configured
- * @param source properties from which to get the extractor properties
- * @return extractor properties
- */
- private Properties makeExtractorProps(PolicyController controller, Properties source) {
- return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
- }
-
- /**
* Indicates that the controller is about to start. Starts the publisher for the
* internal topic, and creates a thread pool for the timers.
*/
@@ -332,29 +298,10 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
current.cancelTimers();
current = newState;
- // set the filter before starting the state
- setFilter(newState.getFilter());
newState.start();
}
}
- /**
- * Sets the server-side filter for the internal topic.
- *
- * @param filter new filter to be used
- */
- private void setFilter(Map<String, Object> filter) {
- try {
- dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
- } catch (JsonParseException e) {
- logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
- } catch (PoolingFeatureException e) {
- logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
- }
- }
-
@Override
public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
// wrap the task in a TimerAction and schedule it
@@ -430,112 +377,91 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
* and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
* it instead, as it already has the decoded message.
*
- * @param protocol protocol
* @param topic2 topic
* @param event event
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+ public boolean beforeOffer(String topic2, String event) {
- if (!controller.isLocked() || !intercept) {
+ if (!controller.isLocked()) {
// we should NOT intercept this message - let the invoker handle it
return false;
}
- return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ return handleExternal(topic2, decodeEvent(topic2, event));
}
/**
* Called by the DroolsController before it inserts the event into the rule engine.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event original event text, as received from the Bus
- * @param event2 event, as an object
+ * @param event event, as an object
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
- if (!intercept) {
- // we should NOT intercept this message - let the invoker handle it
- return false;
- }
-
- return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ public boolean beforeInsert(String topic2, Object event) {
+ return handleExternal(topic2, event);
}
/**
* Handles an event from an external topic.
*
- * @param protocol protocol
* @param topic2 topic
- * @param event event
- * @param reqid request id extracted from the event, or {@code null} if it couldn't be
- * extracted
+ * @param event event, as an object, or {@code null} if it cannot be decoded
* @return {@code true} if the event was handled by the manager, {@code false} if it
* must still be handled by the invoker
*/
- private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
- if (reqid == null) {
- // no request id - let the invoker handle it
- return false;
- }
-
- if (reqid.isEmpty()) {
- logger.warn("handle locally due to empty request id for topic {}", topic2);
- // no request id - let the invoker handle it
+ private boolean handleExternal(String topic2, Object event) {
+ if (event == null) {
+ // no event - let the invoker handle it
return false;
}
- Forward ev = makeForward(protocol, topic2, event, reqid);
- if (ev == null) {
- // invalid args - consume the message
- logger.warn("constructed an invalid Forward message on topic {}", getTopic());
- return true;
- }
-
synchronized (curLocker) {
- return handleExternal(ev);
+ return handleExternal(topic2, event, event.hashCode());
}
}
/**
* Handles an event from an external topic.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleExternal(Forward event) {
+ private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
// no bucket assignments yet - handle locally
- logger.info("handle event locally for request {}", event.getRequestId());
+ logger.info("handle event locally for request {}", event);
// we did NOT consume the event
return false;
} else {
- return handleEvent(event);
+ return handleEvent(topic2, event, eventHashCode);
}
}
/**
* Handles a {@link Forward} event, possibly forwarding it again.
*
- * @param event event
+ * @param topic2 topic
+ * @param event event, as an object
+ * @param eventHashCode event's hash code
* @return {@code true} if the event was handled, {@code false} if the invoker should
* handle it
*/
- private boolean handleEvent(Forward event) {
- String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+ private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+ String target = assignments.getAssignedHost(eventHashCode);
if (target == null) {
/*
* This bucket has no assignment - just discard the event
*/
- logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+ logger.warn("discarded event for unassigned bucket from topic {}", topic2);
return true;
}
@@ -543,41 +469,16 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
/*
* Message belongs to this host - allow the controller to handle it.
*/
- logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+ logger.info("handle local event for request {} from topic {}", event, topic2);
return false;
}
- // forward to a different host, if hop count has been exhausted
- if (event.getNumHops() > MAX_HOPS) {
- logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
- topic);
-
- } else {
- logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
- event.bumpNumHops();
- publish(target, event);
- }
-
- // either way, consume the event
+ // not our message, consume the event
+ logger.warn("discarded event for host {} from topic {}", target, topic2);
return true;
}
/**
- * Extract the request id from an event object.
- *
- * @param event the event object, or {@code null}
- * @return the event's request id, or {@code null} if it can't be extracted
- */
- private String extractRequestId(Object event) {
- if (event == null) {
- return null;
- }
-
- Object reqid = extractors.extract(event);
- return (reqid != null ? reqid.toString() : null);
- }
-
- /**
* Decodes an event from a String into an event Object.
*
* @param topic2 topic
@@ -608,59 +509,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Makes a {@link Forward}, and validates its contents.
- *
- * @param protocol protocol
- * @param topic2 topic
- * @param event event
- * @param reqid request id
- * @return a new message, or {@code null} if the message was invalid
- */
- private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
- try {
- Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
- // required for the validity check
- ev.setChannel(host);
-
- ev.checkValidity();
-
- return ev;
-
- } catch (PoolingFeatureException e) {
- logger.error("invalid message for topic {}", topic2, e);
- return null;
- }
- }
-
- @Override
- public void handle(Forward event) {
- synchronized (curLocker) {
- if (!handleExternal(event)) {
- // this host should handle it - inject it
- inject(event);
- }
- }
- }
-
- /**
- * Injects an event into the controller.
- *
- * @param event event
- */
- private void inject(Forward event) {
- logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
- try {
- intercept = false;
- listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
- } finally {
- intercept = true;
- }
- }
-
- /**
* Handles an event from the internal topic. This uses reflection to identify the
* appropriate process() method to invoke, based on the type of Message that was
* decoded.
@@ -764,21 +612,6 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
}
- /*
- * The remaining methods may be overridden by junit tests.
- */
-
- /**
- * Creates object extractors.
- *
- * @param props properties used to configure the extractors
- * @return a new set of extractors
- */
- protected ClassExtractors makeClassExtractors(Properties props) {
- return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
- PoolingProperties.EXTRACTOR_TYPE);
- }
-
/**
* Creates a DMaaP manager.
*