summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java871
1 files changed, 871 insertions, 0 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
new file mode 100644
index 00000000..cd71670d
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -0,0 +1,871 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.pooling;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.drools.pooling.extractor.ClassExtractors;
+import org.onap.policy.drools.pooling.message.BucketAssignments;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Leader;
+import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.state.ActiveState;
+import org.onap.policy.drools.pooling.state.IdleState;
+import org.onap.policy.drools.pooling.state.InactiveState;
+import org.onap.policy.drools.pooling.state.QueryState;
+import org.onap.policy.drools.pooling.state.StartState;
+import org.onap.policy.drools.pooling.state.State;
+import org.onap.policy.drools.pooling.state.StateTimerTask;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
+import org.onap.policy.drools.system.PolicyController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * Implementation of a {@link PoolingManager}. Until bucket assignments have
+ * been made, events coming from external topics are saved in a queue for later
+ * processing. Once assignments are made, the saved events are processed. In
+ * addition, while the controller is locked, events are still forwarded to other
+ * hosts and bucket assignments are still updated, based on any {@link Leader}
+ * messages that it receives.
+ */
+public class PoolingManagerImpl implements PoolingManager, TopicListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
+
+ // TODO metrics, audit logging
+
+ /**
+ * Maximum number of times a message can be forwarded.
+ */
+ public static final int MAX_HOPS = 5;
+
+ /**
+ * Type of item that the extractors will be extracting.
+ */
+ private static final String EXTRACTOR_TYPE = "requestId";
+
+ /**
+ * Prefix for extractor properties.
+ */
+ private static final String PROP_EXTRACTOR_PREFIX = "extractor." + EXTRACTOR_TYPE;
+
+ /**
+ * Factory used to create various objects. Can be overridden during junit
+ * testing.
+ */
+ private static Factory factory = new Factory();
+
+ /**
+ * ID of this host.
+ */
+ private final String host;
+
+ /**
+ * Properties with which this was configured.
+ */
+ private final PoolingProperties props;
+
+ /**
+ * Associated controller.
+ */
+ private final PolicyController controller;
+
+ /**
+ * Where to offer events that have been forwarded to this host (i.e, the
+ * controller).
+ */
+ private final TopicListener listener;
+
+ /**
+ * Used to encode & decode request objects received from & sent to a rule
+ * engine.
+ */
+ private final Serializer serializer;
+
+ /**
+ * Internal DMaaP topic used by this controller.
+ */
+ private final String topic;
+
+ /**
+ * Manager for the internal DMaaP topic.
+ */
+ private final DmaapManager dmaapMgr;
+
+ /**
+ * Used to extract the request id from the decoded message.
+ */
+ private final ClassExtractors extractors;
+
+ /**
+ * Lock used while updating {@link #current}. In general, public methods
+ * must use this, while private methods assume the lock is already held.
+ */
+ private final Object curLocker = new Object();
+
+ /**
+ * Current state.
+ * <p>
+ * This uses a finite state machine, wherein the state object contains all
+ * of the data relevant to that state. Each state object has a process()
+ * method, specific to each type of {@link Message} subclass. The method
+ * returns the next state object, or {@code null} if the state is to remain
+ * the same.
+ */
+ private State current;
+
+ /**
+ * Current bucket assignments or {@code null}.
+ */
+ private BucketAssignments assignments = null;
+
+ /**
+ * Pool used to execute timers.
+ */
+ private ScheduledThreadPoolExecutor scheduler = null;
+
+ /**
+ * Queue used when no bucket assignments are available.
+ */
+ private EventQueue eventq;
+
+ /**
+ * {@code True} if events offered by the controller should be intercepted,
+ * {@code false} otherwise.
+ */
+ private boolean intercept = true;
+
+ /**
+ * Constructs the manager, initializing all of the data structures.
+ *
+ * @param controller controller with which this is associated
+ * @param props feature properties specific to the controller
+ */
+ public PoolingManagerImpl(PolicyController controller, PoolingProperties props) {
+ this.host = UUID.randomUUID().toString();
+ this.controller = controller;
+ this.props = props;
+
+ try {
+ this.listener = (TopicListener) controller;
+ this.serializer = new Serializer();
+ this.topic = props.getPoolingTopic();
+ this.eventq = factory.makeEventQueue(props);
+
+ SpecProperties spec = new SpecProperties(PROP_EXTRACTOR_PREFIX, controller.getName());
+ this.extractors = factory.makeClassExtractors(spec);
+
+ this.dmaapMgr = factory.makeDmaapManager(props);
+ this.current = new IdleState(this);
+
+ logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
+
+ } catch (ClassCastException e) {
+ logger.error("not a topic listener, controller {}", controller.getName());
+ throw new PoolingFeatureRtException(e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ throw e.toRuntimeException();
+ }
+ }
+
+ protected static Factory getFactory() {
+ return factory;
+ }
+
+ protected static void setFactory(Factory factory) {
+ PoolingManagerImpl.factory = factory;
+ }
+
+ /**
+ * Should only be used by junit tests.
+ *
+ * @return the current state
+ */
+ protected State getCurrent() {
+ synchronized (curLocker) {
+ return current;
+ }
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public PoolingProperties getProperties() {
+ return props;
+ }
+
+ /**
+ * Indicates that the controller is about to start. Starts the publisher for
+ * the internal topic, and creates a thread pool for the timers.
+ *
+ * @throws PoolingFeatureException if the internal topic publisher cannot be
+ * started
+ */
+ public void beforeStart() throws PoolingFeatureException {
+ synchronized (curLocker) {
+ if (scheduler == null) {
+ dmaapMgr.startPublisher();
+
+ scheduler = factory.makeScheduler();
+
+ /*
+ * Only a handful of timers at any moment, thus we can afford to
+ * take the time to remove them when they're cancelled.
+ */
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setMaximumPoolSize(1);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller has successfully started. Starts the
+ * consumer for the internal topic, enters the {@link StartState}, and sets
+ * the filter for the initial state.
+ */
+ public void afterStart() {
+ synchronized (curLocker) {
+ if (current instanceof IdleState) {
+ dmaapMgr.startConsumer(this);
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to stop. Stops the consumer, the
+ * scheduler, and the current state.
+ */
+ public void beforeStop() {
+ ScheduledThreadPoolExecutor sched;
+
+ synchronized (curLocker) {
+ sched = scheduler;
+ scheduler = null;
+
+ if (!(current instanceof IdleState)) {
+ dmaapMgr.stopConsumer(this);
+ changeState(new IdleState(this));
+
+ // TODO
+ /*
+ * Need a brief delay here to allow "offline" message to be
+ * transmitted?
+ */
+ }
+ }
+
+ if (sched != null) {
+ sched.shutdownNow();
+ }
+ }
+
+ /**
+ * Indicates that the controller has stopped. Stops the publisher and logs a
+ * warning if any events are still in the queue.
+ */
+ public void afterStop() {
+ synchronized (curLocker) {
+ if (!eventq.isEmpty()) {
+ logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
+ eventq.clear();
+ }
+
+ dmaapMgr.stopPublisher();
+ }
+ }
+
+ /**
+ * Indicates that the controller is about to be locked. Enters the idle
+ * state, as all it will be doing is forwarding messages.
+ */
+ public void beforeLock() {
+ synchronized (curLocker) {
+ changeState(new IdleState(this));
+ }
+ }
+
+ /**
+ * Indicates that the controller has been unlocked. Enters the start state,
+ * if the controller is running.
+ */
+ public void afterUnlock() {
+ synchronized (curLocker) {
+ if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
+ changeState(new StartState(this));
+ }
+ }
+ }
+
+ /**
+ * Changes the finite state machine to a new state, provided the new state
+ * is not {@code null}.
+ *
+ * @param newState new state, or {@code null} if to remain unchanged
+ */
+ private void changeState(State newState) {
+ if (newState != null) {
+ current.cancelTimers();
+ current = newState;
+
+ // set the filter before starting the state
+ setFilter(newState.getFilter());
+ newState.start();
+ }
+ }
+
+ /**
+ * Sets the server-side filter for the internal topic.
+ *
+ * @param filter new filter to be used
+ */
+ private void setFilter(Map<String, Object> filter) {
+ try {
+ dmaapMgr.setFilter(serializer.encodeFilter(filter));
+
+ } catch (JsonProcessingException e) {
+ logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
+ }
+ }
+
+ @Override
+ public CountDownLatch internalTopicFailed() {
+ logger.error("communication failed for topic {}", topic);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ /*
+ * We don't want to build up items in our queue if we can't forward them
+ * to other hosts, so we just stop the controller.
+ *
+ * Use a background thread to prevent deadlocks.
+ */
+ new Thread() {
+ @Override
+ public void run() {
+ controller.stop();
+ latch.countDown();
+ }
+ }.start();
+
+ return latch;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(long delayMs, StateTimerTask task) {
+ return scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
+ return scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void publishAdmin(Message msg) {
+ publish(Message.ADMIN, msg);
+ }
+
+ @Override
+ public void publish(String channel, Message msg) {
+ msg.setChannel(channel);
+
+ try {
+ // ensure it's valid before we send it
+ msg.checkValidity();
+
+ String txt = serializer.encodeMsg(msg);
+ dmaapMgr.publish(txt);
+
+ } catch (JsonProcessingException e) {
+ logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
+ }
+ }
+
+ /**
+ * Handles an event from the internal topic.
+ *
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * controller should handle it
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
+
+ if (event == null) {
+ logger.error("null event on topic {}", topic);
+ return;
+ }
+
+ synchronized (curLocker) {
+ // it's on the internal topic
+ handleInternal(event);
+ }
+ }
+
+ /**
+ * Called by the PolicyController before it offers the event to the
+ * DroolsController. If the controller is locked, then it isn't processing
+ * events. However, they still need to be forwarded, thus in that case, they
+ * are decoded and forwarded.
+ * <p>
+ * On the other hand, if the controller is not locked, then we just return
+ * immediately and let {@link #beforeInsert(Object, String, String, Object)
+ * beforeInsert()} handle it instead, as it already has the decoded message.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+
+ if (!controller.isLocked() || !intercept) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+ }
+
+ /**
+ * Called by the DroolsController before it inserts the event into the rule
+ * engine.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event original event text, as received from the Bus
+ * @param event2 event, as an object
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
+
+ if (!intercept) {
+ // we should NOT intercept this message - let the invoker handle it
+ return false;
+ }
+
+ return handleExternal(protocol, topic2, event, extractRequestId(event2));
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @param reqid request id extracted from the event, or {@code null} if it
+ * couldn't be extracted
+ * @return {@code true} if the event was handled by the manager,
+ * {@code false} if it must still be handled by the invoker
+ */
+ private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
+ if (reqid == null) {
+ // no request id - let the invoker handle it
+ return false;
+ }
+
+ if (reqid.isEmpty()) {
+ logger.warn("handle locally due to empty request id for topic {}", topic2);
+ // no request id - let the invoker handle it
+ return false;
+ }
+
+ Forward ev = makeForward(protocol, topic2, event, reqid);
+ if (ev == null) {
+ // invalid args - consume the message
+ return true;
+ }
+
+ synchronized (curLocker) {
+ return handleExternal(ev);
+ }
+ }
+
+ /**
+ * Handles an event from an external topic.
+ *
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * invoker should handle it
+ */
+ private boolean handleExternal(Forward event) {
+ if (assignments == null) {
+ // no bucket assignments yet - add it to the queue
+ eventq.add(event);
+
+ // we've consumed the event
+ return true;
+
+ } else {
+ return handleEvent(event);
+ }
+ }
+
+ /**
+ * Handles a {@link Forward} event, possibly forwarding it again.
+ *
+ * @param event
+ * @return {@code true} if the event was handled, {@code false} if the
+ * invoker should handle it
+ */
+ private boolean handleEvent(Forward event) {
+ int bucket = Math.abs(event.getRequestId().hashCode()) % assignments.size();
+ String target = assignments.getAssignedHost(bucket);
+
+ if (target == null) {
+ /*
+ * This bucket has no assignment - just discard the event
+ */
+ return true;
+ }
+
+ if (target.equals(host)) {
+ /*
+ * Message belongs to this host - allow the controller to handle it.
+ */
+ return false;
+ }
+
+ // forward to a different host, if hop count has been exhausted
+ if (event.getNumHops() > MAX_HOPS) {
+ logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
+ topic);
+
+ } else {
+ event.bumpNumHops();
+ publish(target, event);
+ }
+
+ // either way, consume the event
+ return true;
+ }
+
+ /**
+ * Extract the request id from an event object.
+ *
+ * @param event the event object, or {@code null}
+ * @return the event's request id, or {@code null} if it can't be extracted
+ */
+ private String extractRequestId(Object event) {
+ if (event == null) {
+ return null;
+ }
+
+ Object reqid = extractors.extract(event);
+ return (reqid != null ? reqid.toString() : null);
+ }
+
+ /**
+ * Decodes an event from a String into an event Object.
+ *
+ * @param topic2
+ * @param event
+ * @return the decoded event object, or {@code null} if it can't be decoded
+ */
+ private Object decodeEvent(String topic2, String event) {
+ DroolsController drools = controller.getDrools();
+
+ // check if this topic has a decoder
+
+ if (!factory.canDecodeEvent(drools, topic2)) {
+
+ logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
+ drools.getArtifactId());
+ return null;
+ }
+
+ // decode
+
+ try {
+ return factory.decodeEvent(drools, topic2, event);
+
+ } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
+ logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Makes a {@link Forward}, and validates its contents.
+ *
+ * @param protocol
+ * @param topic2
+ * @param event
+ * @param reqid
+ * @return a new message, or {@code null} if the message was invalid
+ */
+ private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
+ try {
+ Forward ev = new Forward(host, protocol, topic2, event, reqid);
+
+ // required for the validity check
+ ev.setChannel(host);
+
+ ev.checkValidity();
+
+ return ev;
+
+ } catch (PoolingFeatureException e) {
+ logger.error("invalid message for topic {}", topic2, e);
+ return null;
+ }
+ }
+
+ @Override
+ public void handle(Forward event) {
+ synchronized (curLocker) {
+ if (!handleExternal(event)) {
+ // this host should handle it - inject it
+ inject(event);
+ }
+ }
+ }
+
+ /**
+ * Injects an event into the controller.
+ *
+ * @param event
+ */
+ private void inject(Forward event) {
+ intercept = false;
+ listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
+
+ intercept = true;
+ }
+
+ /**
+ * Handles an event from the internal topic. This uses reflection to
+ * identify the appropriate process() method to invoke, based on the type of
+ * Message that was decoded.
+ *
+ * @param event the serialized {@link Message} read from the internal topic
+ */
+ private void handleInternal(String event) {
+ Class<?> clazz = null;
+
+ try {
+ Message msg = serializer.decodeMsg(event);
+
+ // get the class BEFORE checking the validity
+ clazz = msg.getClass();
+
+ msg.checkValidity();
+
+ Method meth = current.getClass().getMethod("process", msg.getClass());
+ changeState((State) meth.invoke(current, msg));
+
+ } catch (IOException e) {
+ logger.warn("failed to decode message for topic {}", topic, e);
+
+ } catch (NoSuchMethodException | SecurityException e) {
+ logger.error("no processor for message {} for topic {}", clazz, topic, e);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+
+ } catch (PoolingFeatureException e) {
+ logger.error("failed to process message {} for topic {}", clazz, topic, e);
+ }
+ }
+
+ @Override
+ public void startDistributing(BucketAssignments assignments) {
+ if (assignments == null) {
+ return;
+ }
+
+ synchronized (curLocker) {
+ this.assignments = assignments;
+
+ // now that we have assignments, we can process the queue
+ Forward ev;
+ while ((ev = eventq.poll()) != null) {
+ handle(ev);
+ }
+ }
+ }
+
+ @Override
+ public BucketAssignments getAssignments() {
+ return assignments;
+ }
+
+ @Override
+ public State goStart() {
+ return new StartState(this);
+ }
+
+ @Override
+ public State goQuery() {
+ return new QueryState(this);
+ }
+
+ @Override
+ public State goActive() {
+ return new ActiveState(this);
+ }
+
+ @Override
+ public State goInactive() {
+ return new InactiveState(this);
+ }
+
+ /**
+ * Action to run a timer task. Only runs the task if the machine is still in
+ * the state that it was in when the timer was created.
+ */
+ private class TimerAction implements Runnable {
+
+ /**
+ * State of the machine when the timer was created.
+ */
+ private State origState;
+
+ /**
+ * Task to be executed.
+ */
+ private StateTimerTask task;
+
+ /**
+ *
+ * @param task task to execute when this timer runs
+ */
+ public TimerAction(StateTimerTask task) {
+ this.origState = current;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ synchronized (curLocker) {
+ if (current == origState) {
+ changeState(task.fire(null));
+ }
+ }
+ }
+ }
+
+ /**
+ * Factory used to create objects.
+ */
+ public static class Factory {
+
+ /**
+ * Creates an event queue.
+ *
+ * @param props properties used to configure the event queue
+ * @return a new event queue
+ */
+ public EventQueue makeEventQueue(PoolingProperties props) {
+ return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
+ }
+
+ /**
+ * Creates object extractors.
+ *
+ * @param props properties used to configure the extractors
+ * @return a new set of extractors
+ */
+ public ClassExtractors makeClassExtractors(Properties props) {
+ return new ClassExtractors(props, PROP_EXTRACTOR_PREFIX, EXTRACTOR_TYPE);
+ }
+
+ /**
+ * Creates a DMaaP manager.
+ *
+ * @param props properties used to configure the manager
+ * @return a new DMaaP manager
+ * @throws PoolingFeatureException if an error occurs
+ */
+ public DmaapManager makeDmaapManager(PoolingProperties props) throws PoolingFeatureException {
+ return new DmaapManager(props.getPoolingTopic(), props.getSource());
+ }
+
+ /**
+ * Creates a scheduled thread pool.
+ *
+ * @return a new scheduled thread pool
+ */
+ public ScheduledThreadPoolExecutor makeScheduler() {
+ return new ScheduledThreadPoolExecutor(1);
+ }
+
+ /**
+ * Determines if the event can be decoded.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @return {@code true} if the event can be decoded, {@code false}
+ * otherwise
+ */
+ public boolean canDecodeEvent(DroolsController drools, String topic) {
+ return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
+ }
+
+ /**
+ * Decodes the event.
+ *
+ * @param drools drools controller
+ * @param topic topic on which the event was received
+ * @param event event text to be decoded
+ * @return the decoded event
+ * @throws IllegalArgumentException
+ * @throw UnsupportedOperationException
+ * @throws IllegalStateException
+ */
+ public Object decodeEvent(DroolsController drools, String topic, String event) {
+ return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);
+ }
+ }
+}