diff options
author | Jim Hahn <jrh3@att.com> | 2020-08-31 20:53:44 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-08-31 20:53:44 +0000 |
commit | a588736799d94747f70ed648d3d821210993c5c4 (patch) | |
tree | f54ea646db61861985fe6ae87b76d87e8a9369a2 /controlloop/common/controller-tdjam/src/main/java | |
parent | 1c5cb8a0d740ccd92d2b3fdce8eb192cd20b147f (diff) | |
parent | a6d4077e3639a0f3478f0cbf51e06ef46517a10d (diff) |
Merge "Add tdjam-controller"
Diffstat (limited to 'controlloop/common/controller-tdjam/src/main/java')
3 files changed, 1624 insertions, 0 deletions
diff --git a/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/SerialWorkQueue.java b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/SerialWorkQueue.java new file mode 100644 index 000000000..7d83765a3 --- /dev/null +++ b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/SerialWorkQueue.java @@ -0,0 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.tdjam; + +import java.util.LinkedList; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides a way to handle synchronization, with minimal blocking. Requests + * are queued until {@link #start()} is invoked. + */ +public class SerialWorkQueue { + private static Logger logger = LoggerFactory.getLogger(SerialWorkQueue.class); + + // current work list + private LinkedList<Runnable> workQueue; + + @Getter + private boolean running = false; + + /** + * Constructor - no initial Runnable. + */ + public SerialWorkQueue() { + workQueue = new LinkedList<>(); + } + + /** + * Constructor - initial 'Runnable' is specified. + * + * @param runnable an initial 'Runnnable' to run + */ + public SerialWorkQueue(Runnable runnable) { + workQueue = new LinkedList<>(); + workQueue.add(runnable); + } + + /** + * Starts the queue. If the current thread is the first to start it, then the current + * thread will process any requests in the queue before returning. + */ + public void start() { + Runnable item; + + synchronized (this) { + if (running) { + // already running + return; + } + + running = true; + item = workQueue.peekFirst(); + } + + if (item != null) { + processQueue(item); + } + } + + /** + * Called to add a 'Runnable' to the work queue. If the queue was empty, the current + * thread is used to process the queue. + * + * @param work the Runnable to be queued, and eventually run + */ + public void queueAndRun(Runnable work) { + synchronized (this) { + workQueue.add(work); + if (!running || workQueue.size() > 1) { + // there was already work in the queue, so presumably there is + // already an associated thread running + return; + } + // if we reach this point, the queue was empty when this method was + // called, so this thread will process the queue + } + + processQueue(work); + } + + /** + * Internal method to process the work queue until it is empty. Note that entries + * could be added by this thread or another one while we are working. + * + * @param firstItem the first item in the queue + */ + private void processQueue(Runnable firstItem) { + Runnable next = firstItem; + while (next != null) { + try { + next.run(); + } catch (Exception e) { + logger.error("SerialWorkQueue.processQueue exception", e); + } + + synchronized (this) { + // remove the job we just ran + workQueue.removeFirst(); + next = workQueue.peekFirst(); + } + } + } +} diff --git a/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/TdjamController.java b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/TdjamController.java new file mode 100644 index 000000000..0b17f196f --- /dev/null +++ b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/controlloop/tdjam/TdjamController.java @@ -0,0 +1,833 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.tdjam; + +import static org.onap.policy.drools.properties.DroolsPropertyConstants.PROPERTY_CONTROLLER_TYPE; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.controlloop.CanonicalOnset; +import org.onap.policy.controlloop.ControlLoopEvent; +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.ControlLoopNotificationType; +import org.onap.policy.controlloop.ControlLoopResponse; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.VirtualControlLoopNotification; +import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; +import org.onap.policy.controlloop.eventmanager.ControlLoopEventManager2; +import org.onap.policy.controlloop.utils.ControlLoopUtils; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.features.DroolsControllerFeatureApi; +import org.onap.policy.drools.features.PolicyControllerFeatureApi; +import org.onap.policy.drools.protocol.coders.EventProtocolCoder.CoderFilters; +import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants; +import org.onap.policy.drools.protocol.coders.ProtocolCoderToolset; +import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.extension.system.NonDroolsPolicyController; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This replaces a Drools session with Java code. Although Drools memory + * is simulated when running the Junit tests, there is no actual use of + * Drools here. + */ +public class TdjamController extends NonDroolsPolicyController { + private static Logger logger = LoggerFactory.getLogger(TdjamController.class); + + // the 'controller.type' property is set to this value + private static final String TDJAM_CONTROLLER_BUILDER_TAG = "tdjam"; + + // additional data associated with session + private final String groupId; + private final String artifactId; + + // top-level tosca policy table (first key = name, second key = version) + private final Map<String, Map<String, ToscaPolicy>> toscaPolicies = new HashMap<>(); + + // maps 'controlLoopControlName' to 'ControlLoopParams' + private final Map<String, ControlLoopParams> controlLoopParams = new HashMap<>(); + + // maps 'requestId' to 'ControlLoopEventManager' + private final Map<UUID, ControlLoopEventManager> eventManagers = new ConcurrentHashMap<>(); + + // maps onset to 'ControlLoopEventManager' + private final Map<VirtualControlLoopEvent, ControlLoopEventManager> onsetToEventManager = new ConcurrentHashMap<>(); + + // maps 'topic' to 'TopicData' + private final Map<String, TopicData> topicDataTable = new ConcurrentHashMap<>(); + + /* ============================================================ */ + + /** + * Initialize a new 'TdjamController'. + * + * @param name the controller name + * @param properties properties defining the controller + */ + public TdjamController(String name, Properties properties) { + super(name, properties); + + this.groupId = getGroupId(); + this.artifactId = getArtifactId(); + + init(); + } + + private void init() { + // go through all of the incoming message decoders associated + // with this controller + for (ProtocolCoderToolset pct : + EventProtocolCoderConstants.getManager() + .getDecoders(groupId, artifactId)) { + // go through the 'CoderFilters' instances, and see if there are + // any that we are interested in + for (CoderFilters cf : pct.getCoders()) { + try { + Class<?> clazz = Class.forName(cf.getCodedClass()); + if (ControlLoopEvent.class.isAssignableFrom(clazz)) { + // this one is of interest + logger.debug("TdjamController using CoderFilters: {}", cf); + getTopicData(pct.getTopic()); + } + } catch (ClassNotFoundException e) { + logger.error("CoderFilter refers to unknown class: {}", + cf.getCodedClass(), e); + } + } + } + + // start all 'TopicData' instances + for (TopicData topicData : topicDataTable.values()) { + topicData.start(); + } + } + + @Override + public <T> boolean offer(T object) { + if (object instanceof ToscaPolicy) { + addToscaPolicy((ToscaPolicy) object); + return true; + } + return false; + } + + /** + * Add or replace a ToscaPolicy instance. The policy is keyed by name and + * version. + * + * @param toscaPolicy the ToscaPolicy being added + * @return if a ToscaPolicy with this name/version previously existed within + * this TdjamController, it is returned; otherwise, 'null' is returned. + */ + public synchronized ToscaPolicy addToscaPolicy(ToscaPolicy toscaPolicy) { + Map<String, ToscaPolicy> level2 = + toscaPolicies.computeIfAbsent(toscaPolicy.getName(), + key -> new HashMap<String, ToscaPolicy>()); + ToscaPolicy prev = level2.put(toscaPolicy.getVersion(), toscaPolicy); + if (prev != null) { + // update 'ControlLoopParams' entries + for (ControlLoopParams clp : controlLoopParams.values()) { + if (clp.getToscaPolicy() == prev) { + clp.setToscaPolicy(toscaPolicy); + } + } + } + logger.debug("ToscaPolicy name={}, version={}, count={}, prev={}", + toscaPolicy.getName(), toscaPolicy.getVersion(), toscaPolicies.size(), (prev != null)); + dumpTables(); + + // attempt to create a 'ControlLoopParams' instance from this object + ControlLoopParams params = + ControlLoopUtils.toControlLoopParams(toscaPolicy); + if (params != null) { + addControlLoopParams(params); + } + return prev; + } + + /** + * Remove a ToscaPolicy instance associated with the specified name and + * version. + * + * @param name the name of the ToscaPolicy to remove + * @param version the version of the ToscaPolicy to remove + * @return the ToscaPolicy that was removed, or 'null' if not found + */ + public synchronized ToscaPolicy removeToscaPolicy(String name, String version) { + ToscaPolicy prev = null; + Map<String, ToscaPolicy> level2 = toscaPolicies.get(name); + + if (level2 != null && (prev = level2.remove(version)) != null) { + // remove all 'ControlLoopParams' entries referencing this policy + for (ControlLoopParams clp : + new ArrayList<>(controlLoopParams.values())) { + if (clp.getToscaPolicy() == prev) { + controlLoopParams.remove(clp.getClosedLoopControlName()); + } + } + } + return prev; + } + + /** + * Fetch a ToscaPolicy instance associated with the specified name and + * version. + * + * @param name the name of the ToscaPolicy + * @param version the version of the ToscaPolicy + * @return the ToscaPolicy, or 'null' if not found + */ + public synchronized ToscaPolicy getToscaPolicy(String name, String version) { + Map<String, ToscaPolicy> level2 = toscaPolicies.get(name); + return (level2 == null ? null : level2.get(version)); + } + + /** + * Return a collection of all ToscaPolicy instances. + * + * @return all ToscaPolicy instances + */ + public synchronized Collection<ToscaPolicy> getAllToscaPolicies() { + HashSet<ToscaPolicy> rval = new HashSet<>(); + for (Map<String, ToscaPolicy> map : toscaPolicies.values()) { + rval.addAll(map.values()); + } + return rval; + } + + /** + * Add a new 'ControlLoopParams' instance -- they are keyed by + * 'closedLoopControlName'. + * + * @param clp the 'ControlLoopParams' instance to add + * @return the 'ControlLoopParams' instance previously associated with the + * 'closedLoopControlName' ('null' if it didn't exist) + */ + public synchronized ControlLoopParams addControlLoopParams(ControlLoopParams clp) { + ToscaPolicy toscaPolicy = + getToscaPolicy(clp.getPolicyName(), clp.getPolicyVersion()); + if (toscaPolicy == null) { + // there needs to be a 'ToscaPolicy' instance with a matching + // name/version + logger.debug("Missing ToscaPolicy, name={}, version={}", + clp.getPolicyName(), clp.getPolicyVersion()); + return clp; + } + + clp.setToscaPolicy(toscaPolicy); + ControlLoopParams prev = + controlLoopParams.put(clp.getClosedLoopControlName(), clp); + + logger.debug("ControlLoopParams name={}, version={}, closedLoopControlName={}, count={}, prev={}", + clp.getPolicyName(), clp.getPolicyVersion(), + clp.getClosedLoopControlName(), controlLoopParams.size(), (prev != null)); + dumpTables(); + return prev; + } + + /** + * Return a collection of all ControlLoopParams instances. + * + * @return all ControlLoopParams instances + */ + public synchronized Collection<ControlLoopParams> getAllControlLoopParams() { + return new ArrayList<>(controlLoopParams.values()); + } + + /** + * Return a collection of all EventManager instances. + * + * @return all EventManager instances + * + */ + public synchronized Collection<ControlLoopEventManager> getAllEventManagers() { + return new ArrayList<>(eventManagers.values()); + } + + /** + * Return a collection of all onsetToEventManager instances. + * + * @return all onsetToEventManager instances + * + */ + public synchronized Collection<ControlLoopEventManager> getAllOnsetToEventManager() { + return new ArrayList<>(onsetToEventManager.values()); + } + + /** + * Reset the controller. + * + */ + public synchronized void reset() { + toscaPolicies.clear(); + controlLoopParams.clear(); + eventManagers.clear(); + onsetToEventManager.clear(); + } + + @Override + public boolean stop() { + super.stop(); + + // stop all 'TopicData' instances + for (TopicData topicData : topicDataTable.values()) { + topicData.stop(); + } + return true; + } + + /** + * Remove a ControlLoopParams instance associated with the specified + * 'closedLoopControlName'. + * + * @param closedLoopControlName the closedLoopControlName identifying the + * ControlLoopParams instance + * @return the 'ControlLoopParams' instance, 'null' if not found + */ + public synchronized ControlLoopParams removeControlLoopParams(String closedLoopControlName) { + return controlLoopParams.remove(closedLoopControlName); + } + + /** + * Dump out the ToscaPolicy and ControlLoopParams tables in + * human-readable form. + */ + private void dumpTables() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bos, true); + + // name(25) version(10) closedLoopControlName(...) + + String format = "%-25s %-10s %s\n"; + out.println("ToscaPolicy Table"); + out.format(format, "Name", "Version", ""); + out.format(format, "----", "-------", ""); + + for (Map<String, ToscaPolicy> level2 : toscaPolicies.values()) { + for (ToscaPolicy tp : level2.values()) { + out.format(format, tp.getName(), tp.getVersion(), ""); + } + } + + out.println("\nControlLoopParams Table"); + out.format(format, "Name", "Version", "ClosedLoopControlName"); + out.format(format, "----", "-------", "---------------------"); + for (ControlLoopParams cp : controlLoopParams.values()) { + out.format(format, cp.getPolicyName(), cp.getPolicyVersion(), + cp.getClosedLoopControlName()); + } + + logger.debug(new String(bos.toByteArray())); + } + + /** + * Find or create a 'TopicData' instance associated with the specified + * topic name. + * + * @param name the topic name + * @return the new or existing 'TopicData' instance associated with 'name' + */ + private TopicData getTopicData(String name) { + return topicDataTable.computeIfAbsent(name, key -> new TopicData(name)); + } + + /* ============================================================ */ + + /** + * Process an incoming 'ControlLoopEvent'. + * + * @param event the incoming 'ControlLoopEvent' + */ + private void processEvent(ControlLoopEvent event) { + String clName = event.getClosedLoopControlName(); + ControlLoopParams params = controlLoopParams.get(clName); + if (params == null) { + logger.debug("No ControlLoopParams for event: {}", event); + return; + } + + UUID requestId = event.getRequestId(); + if (event instanceof CanonicalOnset) { + CanonicalOnset coEvent = (CanonicalOnset) event; + + if (requestId == null) { + // the requestId should not be 'null' + handleNullRequestId(coEvent, params); + return; + } + + ControlLoopEventManager manager = onsetToEventManager.computeIfAbsent(coEvent, key -> { + // a ControlLoopEventManager does not yet exist for this + // 'event' -- create one, with the initial event + try { + ControlLoopEventManager mgr = new ControlLoopEventManager(params, coEvent); + eventManagers.put(requestId, mgr); + return mgr; + } catch (ControlLoopException e) { + logger.error("Exception creating ControlLoopEventManager", e); + return null; + } + }); + + if (manager != null && !manager.getSerialWorkQueue().isRunning()) { + // new manager - start it by processing the initial event + manager.getSerialWorkQueue().start(); + return; + } + } + + if (event instanceof VirtualControlLoopEvent) { + ControlLoopEventManager manager = eventManagers.get(requestId); + if (manager != null) { + manager.getSerialWorkQueue() + .queueAndRun(() -> manager.subsequentEvent((VirtualControlLoopEvent) event)); + return; + } + } + + // this block of code originally appeared in the 'EVENT.CLEANUP' + // Drools rule + String ruleName = "EVENT.CLEANUP"; + + logger.info("{}: {}", clName, ruleName); + logger.debug("{}: {}: orphan event={}", clName, ruleName, event); + } + + /** + * Generate and send a notification message in response to a 'CanonicalOnset' + * with a null 'requestId'. + * + * @param event the CanonicalOnset event + * @param params the associated ControlLoopParams + */ + private void handleNullRequestId(CanonicalOnset event, + ControlLoopParams params) { + // this block of code originally appeared in the 'EVENT' Drools rule + String ruleName = "EVENT"; + String clName = event.getClosedLoopControlName(); + + VirtualControlLoopNotification notification = + new VirtualControlLoopNotification(event); + notification.setNotification(ControlLoopNotificationType.REJECTED); + notification.setFrom("policy"); + notification.setMessage("Missing requestId"); + notification.setPolicyName(params.getPolicyName() + "." + ruleName); + notification.setPolicyScope(params.getPolicyScope()); + notification.setPolicyVersion(params.getPolicyVersion()); + + // + // Generate notification + // + try { + PolicyEngineConstants.getManager().deliver("POLICY-CL-MGT", notification); + + } catch (RuntimeException e) { + logger.warn("{}: {}.{}: event={} exception generating notification", + clName, params.getPolicyName(), ruleName, + event, e); + } + } + + /* ============================================================ */ + + /** + * This nested class corresponds to a single topic name. At present, the + * only topics that are directly handled by this class are + * 'ControlLoopEvent', and subclasses (hence, the call to 'processEvent'). + * If other event types later need to be directly handled, this may need to + * become an abstract class, with subclasses for the various event types. + */ + private class TopicData implements TopicListener { + // topic name + private String name; + + // set of 'TopicSource' instances associated with this topic + // (probably only one, but the underlying APIs support a list) + private List<TopicSource> topicSources = null; + + /** + * Constructor -- initialize the 'TopicData' instance. + * + * @param name the topic name + */ + private TopicData(String name) { + this.name = name; + } + + /** + * Register all of the 'TopicSource' instances associated with this + * topic, and start the listeners. + */ + private void start() { + if (topicSources == null) { + // locate topic sources + ArrayList<String> topics = new ArrayList<>(); + topics.add(name); + topicSources = TopicEndpointManager.getManager().getTopicSources(topics); + } + + for (TopicSource consumer : topicSources) { + consumer.register(this); + consumer.start(); + } + } + + /** + * Unregister all of the 'TopicSource' instances associated with this + * topic, and stop the listeners. + */ + private void stop() { + if (topicSources != null) { + for (TopicSource consumer : topicSources) { + consumer.unregister(this); + consumer.stop(); + } + } + } + + /*===========================*/ + /* 'TopicListener' interface */ + /*===========================*/ + + @Override + public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event) { + logger.debug("TopicData.onTopicEvent: {}", event); + Object decodedObject = + EventProtocolCoderConstants.getManager().decode(groupId, artifactId, topic, event); + if (decodedObject != null) { + logger.debug("Decoded to object of {}", decodedObject.getClass()); + if (decodedObject instanceof ControlLoopEvent) { + PolicyEngineConstants.getManager().getExecutorService().execute(() -> + processEvent((ControlLoopEvent) decodedObject)); + } + } + } + } + + /* ============================================================ */ + + /** + * This is a 'ControlLoopEventManager2' variant designed to run under + * 'TdjamController'. + */ + private class ControlLoopEventManager extends ControlLoopEventManager2 { + private static final long serialVersionUID = 1L; + + // used to serialize method calls from multiple threads, which avoids the + // need for additional synchronization + private final SerialWorkQueue serialWorkQueue; + + private final ControlLoopParams params; + + // onset event + private final CanonicalOnset event; + + /** + * Constructor - initialize a ControlLoopEventManager. + * + * @param params the 'ControlLoopParam's instance associated with the + * 'closedLoopControlName' + * @param event the initial ControlLoopEvent + */ + private ControlLoopEventManager(ControlLoopParams params, CanonicalOnset event) + throws ControlLoopException { + + super(params, event); + this.params = params; + this.event = event; + this.serialWorkQueue = new SerialWorkQueue(this::initialEvent); + } + + /** + * Return the SerialWorkQueue. + * + * @return the SerialWorkQueue + */ + private SerialWorkQueue getSerialWorkQueue() { + return serialWorkQueue; + } + + /** + * This is a notification from the base class that a state transition + * has occurred. + */ + @Override + protected void notifyUpdate() { + update(); + } + + /** + * Process the initial event from DCAE that caused the + * 'ControlLoopEventManager' to be created. + */ + private void initialEvent() { + // this block of code originally appeared in the 'EVENT' Drools rule + String ruleName = "EVENT"; + UUID requestId = event.getRequestId(); + String clName = event.getClosedLoopControlName(); + + VirtualControlLoopNotification notification; + + try { + // + // Check the event, because we need it to not be null when + // we create the ControlLoopEventManager. The ControlLoopEventManager + // will do extra syntax checking as well as check if the closed loop is disabled. + // + try { + start(); + } catch (Exception e) { + eventManagers.remove(requestId, this); + onsetToEventManager.remove(event, this); + throw e; + } + notification = makeNotification(); + notification.setNotification(ControlLoopNotificationType.ACTIVE); + notification.setPolicyName(params.getPolicyName() + "." + ruleName); + } catch (Exception e) { + logger.warn("{}: {}.{}", clName, params.getPolicyName(), ruleName, e); + notification = new VirtualControlLoopNotification(event); + notification.setNotification(ControlLoopNotificationType.REJECTED); + notification.setMessage("Exception occurred: " + e.getMessage()); + notification.setPolicyName(params.getPolicyName() + "." + ruleName); + notification.setPolicyScope(params.getPolicyScope()); + notification.setPolicyVersion(params.getPolicyVersion()); + } + // + // Generate notification + // + try { + PolicyEngineConstants.getManager().deliver("POLICY-CL-MGT", notification); + + } catch (RuntimeException e) { + logger.warn("{}: {}.{}: event={} exception generating notification", + clName, params.getPolicyName(), ruleName, + event, e); + } + } + + /** + * Process a subsequent event from DCAE. + * + * @param event the VirtualControlLoopEvent event + */ + private void subsequentEvent(VirtualControlLoopEvent event) { + // this block of code originally appeared in the + // 'EVENT.MANAGER>NEW.EVENT' Drools rule + String ruleName = "EVENT.MANAGER.NEW.EVENT"; + + // + // Check what kind of event this is + // + switch (onNewEvent(event)) { + case SYNTAX_ERROR: + // + // Ignore any bad syntax events + // + logger.warn("{}: {}.{}: syntax error", + getClosedLoopControlName(), getPolicyName(), ruleName); + break; + + case FIRST_ABATEMENT: + case SUBSEQUENT_ABATEMENT: + // + // TODO: handle the abatement. Currently, it's just discarded. + // + break; + + case FIRST_ONSET: + case SUBSEQUENT_ONSET: + default: + // + // We don't care about subsequent onsets + // + logger.warn("{}: {}.{}: subsequent onset", + getClosedLoopControlName(), getPolicyName(), ruleName); + break; + } + } + + /** + * Called when a state transition occurs. + */ + private void update() { + // handle synchronization by running it under the SerialWorkQueue + getSerialWorkQueue().queueAndRun(() -> { + if (isActive()) { + updateActive(); + } else { + updateInactive(); + } + }); + } + + /** + * Called when a state transition occurs, and we are in the active state. + */ + private void updateActive() { + if (!isUpdated()) { + // no notification needed + return; + } + + // this block of code originally appeared in the + // 'EVENT.MANAGER.PROCESSING' Drools rule + String ruleName = "EVENT.MANAGER.PROCESSING"; + VirtualControlLoopNotification notification = + getNotification(); + + logger.info("{}: {}.{}: manager={}", + getClosedLoopControlName(), getPolicyName(), ruleName, + this); + // + // Generate notification + // + try { + notification.setPolicyName(getPolicyName() + "." + ruleName); + PolicyEngineConstants.getManager().deliver("POLICY-CL-MGT", notification); + + } catch (RuntimeException e) { + logger.warn("{}: {}.{}: manager={} exception generating notification", + getClosedLoopControlName(), getPolicyName(), ruleName, + this, e); + } + // + // Generate Response notification + // + try { + ControlLoopResponse clResponse = getControlLoopResponse(); + if (clResponse != null) { + PolicyEngineConstants.getManager().deliver("DCAE_CL_RSP", clResponse); + } + + } catch (RuntimeException e) { + logger.warn("{}: {}.{}: manager={} exception generating Response notification", + getClosedLoopControlName(), getPolicyName(), ruleName, + this, e); + } + // + // Discard this message and wait for the next response. + // + nextStep(); + update(); + } + + /** + * Called when a state transition has occurred, and we are not in the + * active state. + */ + private void updateInactive() { + // this block of code originally appeared in the 'EVENT.MANAGER.FINAL' + // Drools rule + String ruleName = "EVENT.MANAGER.FINAL"; + VirtualControlLoopNotification notification = + getNotification(); + + logger.info("{}: {}.{}: manager={}", + getClosedLoopControlName(), getPolicyName(), ruleName, + this); + // + // Generate notification + // + try { + notification.setPolicyName(getPolicyName() + "." + ruleName); + PolicyEngineConstants.getManager().deliver("POLICY-CL-MGT", notification); + } catch (RuntimeException e) { + logger.warn("{}: {}.{}: manager={} exception generating notification", + getClosedLoopControlName(), getPolicyName(), ruleName, + this, e); + } + // + // Destroy the manager + // + destroy(); + + // Remove the entry from the table + eventManagers.remove(getRequestId(), this); + onsetToEventManager.remove(event, this); + } + } + + /* ============================================================ */ + + /** + * An instance of this class is called by 'IndexedPolicyControllerFactory'. + * It does the build operation when the value of the 'controller.type' + * property matches the value of TDJAM_CONTROLLER_BUILDER_TAG. + */ + public static class PolicyBuilder implements PolicyControllerFeatureApi { + @Override + public int getSequenceNumber() { + return 1; + } + + @Override + public PolicyController beforeInstance(String name, Properties properties) { + if (TDJAM_CONTROLLER_BUILDER_TAG.equals(properties.getProperty(PROPERTY_CONTROLLER_TYPE))) { + return new TdjamController(name, properties); + } + return null; + } + } + + /* ============================================================ */ + + /** + * An instance of this class is called by 'IndexedDroolsControllerFactory'. + * It does the build operation when the value of the 'controller.type' + * property matches the value of TDJAM_CONTROLLER_BUILDER_TAG. + */ + public static class DroolsBuilder implements DroolsControllerFeatureApi { + @Override + public int getSequenceNumber() { + return 1; + } + + @Override + public DroolsController beforeInstance(Properties properties, + String groupId, String artifactId, String version, + List<TopicCoderFilterConfiguration> decoderConfigurations, + List<TopicCoderFilterConfiguration> encoderConfigurations) throws LinkageError { + + if (TDJAM_CONTROLLER_BUILDER_TAG.equals(properties.getProperty(PROPERTY_CONTROLLER_TYPE))) { + return TdjamController.getBuildInProgress(); + } + return null; + } + } +} diff --git a/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java new file mode 100644 index 000000000..d876bee96 --- /dev/null +++ b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java @@ -0,0 +1,668 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.extension.system; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.onap.policy.common.endpoints.event.comm.Topic; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.utils.services.OrderedServiceImpl; +import org.onap.policy.drools.controller.DroolsController; +import org.onap.policy.drools.core.PolicyContainer; +import org.onap.policy.drools.features.DroolsControllerFeatureApi; +import org.onap.policy.drools.features.DroolsControllerFeatureApiConstants; +import org.onap.policy.drools.protocol.coders.EventProtocolCoder; +import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants; +import org.onap.policy.drools.protocol.coders.EventProtocolParams; +import org.onap.policy.drools.protocol.coders.JsonProtocolFilter; +import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration; +import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder; +import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter; +import org.onap.policy.drools.system.internal.AggregatedPolicyController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class combines the 'PolicyController' and 'DroolsController' + * interfaces, and provides a controller that does not have Drools running + * underneath. It also contains some code copied from 'MavenDroolsController' + * and 'NullDroolsController'. The goal is to have it look like other + * controllers, use the same style property file, and provide access to + * UEB/DMAAP message streams associated with the controller. + */ +public class NonDroolsPolicyController extends AggregatedPolicyController implements DroolsController { + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(NonDroolsPolicyController.class); + + /** + * The PolicyController and DroolsController factories assume that the + * controllers are separate objects, but in this case, the same object + * is used for both. We want the DroolsController 'build' method to + * return the same object; however, at the point the DroolsController + * build is taking place, the PolicyController hasn't yet been placed + * in any tables. The following variable is used to pass this information + * from one stack frame to another within the same thread. + */ + private static ThreadLocal<NonDroolsPolicyController> buildInProgress = new ThreadLocal<>(); + + /** + * alive status of this drools controller, + * reflects invocation of start()/stop() only. + */ + protected volatile boolean alive = false; + + /** + * locked status of this drools controller, + * reflects if i/o drools related operations are permitted, + * more specifically: offer() and deliver(). + * It does not affect the ability to start and stop + * underlying drools infrastructure + */ + protected volatile boolean locked = false; + + /** + * list of topics, each with associated decoder classes, each + * with a list of associated filters. + */ + protected List<TopicCoderFilterConfiguration> decoderConfigurations; + + /** + * list of topics, each with associated encoder classes, each + * with a list of associated filters. + */ + protected List<TopicCoderFilterConfiguration> encoderConfigurations; + + /** + * recent sink events processed. + */ + protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10); + + // this is used to avoid infinite recursion in a shutdown or halt operation + private boolean shutdownInProgress = false; + + private static Properties convert(String name, Properties properties) { + + Properties newProperties = new Properties(); + for (String pname : properties.stringPropertyNames()) { + newProperties.setProperty(pname, properties.getProperty(pname)); + } + + newProperties.setProperty("rules.groupId", "NonDroolsPolicyController"); + newProperties.setProperty("rules.artifactId", name); + newProperties.setProperty("rules.version", "1.0"); + return newProperties; + } + + /** + * constructor -- pass parameters to superclass. + * @param name controller name + * @param properties contents of controller properties file + */ + public NonDroolsPolicyController(String name, Properties properties) { + super(name, convert(name, properties)); + } + + /** + * This is used to pass the 'NonDroolsPolicyController' object to the + * 'DroolsPolicyBuilder' object, as the same object is used for both + * 'PolicyController' and 'DroolsController'. + * + * @return the NonDroolsPolicyController object ('null' if not available) + */ + public static NonDroolsPolicyController getBuildInProgress() { + return buildInProgress.get(); + } + + protected void initDrools(Properties properties) { + try { + // Register with drools factory + buildInProgress.set(this); + this.droolsController.set(getDroolsFactory().build(properties, sources, sinks)); + buildInProgress.set(null); + } catch (Exception | LinkageError e) { + logger.error("{}: cannot init-drools", this); + throw new IllegalArgumentException(e); + } + + decoderConfigurations = codersAndFilters(properties, sources); + encoderConfigurations = codersAndFilters(properties, sinks); + + // add to 'EventProtocolCoderConstants.getManager()' table + for (TopicCoderFilterConfiguration tcfc : decoderConfigurations) { + for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) { + getCoderManager().addDecoder( + EventProtocolParams.builder() + .groupId(getGroupId()) + .artifactId(getArtifactId()) + .topic(tcfc.getTopic()) + .eventClass(pcf.getCodedClass()) + .protocolFilter(pcf.getFilter()) + .customGsonCoder(tcfc.getCustomGsonCoder()) + .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode())); + } + } + for (TopicCoderFilterConfiguration tcfc : encoderConfigurations) { + for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) { + getCoderManager().addEncoder( + EventProtocolParams.builder() + .groupId(getGroupId()) + .artifactId(getArtifactId()) + .topic(tcfc.getTopic()) + .eventClass(pcf.getCodedClass()) + .protocolFilter(pcf.getFilter()) + .customGsonCoder(tcfc.getCustomGsonCoder()) + .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode())); + } + } + } + + /*==============================*/ + /* 'DroolsController' interface */ + /*==============================*/ + + // methods copied from 'MavenDroolsController' and 'NullDroolsController' + + @Override + public boolean start() { + + logger.info("START: {}", this); + + synchronized (this) { + if (this.alive) { + return true; + } + this.alive = true; + } + + return true; + } + + @Override + public boolean stop() { + + logger.info("STOP: {}", this); + + synchronized (this) { + if (!this.alive) { + return true; + } + this.alive = false; + } + + return true; + } + + @Override + public void shutdown() { + if (shutdownInProgress) { + // avoid infinite recursion + return; + } + logger.info("{}: SHUTDOWN", this); + + try { + this.stop(); + this.removeCoders(); + shutdownInProgress = true; + + // the following method calls 'this.shutdown' recursively + getDroolsFactory().shutdown(this); + } catch (Exception e) { + logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e); + } finally { + shutdownInProgress = false; + } + } + + @Override + public void halt() { + if (shutdownInProgress) { + // avoid infinite recursion + return; + } + logger.info("{}: HALT", this); + + try { + this.stop(); + this.removeCoders(); + shutdownInProgress = true; + + // the following method calls 'this.halt' recursively + getDroolsFactory().destroy(this); + } catch (Exception e) { + logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e); + } finally { + shutdownInProgress = false; + } + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + logger.info("LOCK: {}", this); + + this.locked = true; + return true; + } + + @Override + public boolean unlock() { + logger.info("UNLOCK: {}", this); + + this.locked = false; + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public String getGroupId() { + return "NonDroolsPolicyController"; + } + + @Override + public String getArtifactId() { + return getName(); + } + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public List<String> getSessionNames() { + return new ArrayList<>(); + } + + @Override + public List<String> getCanonicalSessionNames() { + return new ArrayList<>(); + } + + @Override + public List<String> getBaseDomainNames() { + return Collections.emptyList(); + } + + @Override + public boolean offer(String topic, String event) { + return false; + } + + @Override + public <T> boolean offer(T event) { + return false; + } + + @Override + public boolean deliver(TopicSink sink, Object event) { + + // this one is from 'MavenDroolsController' + + logger.info("{} DELIVER: {} FROM {} TO {}", this, event, this, sink); + + for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { + try { + if (feature.beforeDeliver(this, sink, event)) { + return true; + } + } catch (Exception e) { + logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(), + e.getMessage(), e); + } + } + + if (sink == null) { + throw new IllegalArgumentException(this + " invalid sink"); + } + + if (event == null) { + throw new IllegalArgumentException(this + " invalid event"); + } + + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + String json = + getCoderManager().encode(sink.getTopic(), event, this); + + synchronized (this.recentSinkEvents) { + this.recentSinkEvents.add(json); + } + + boolean success = sink.send(json); + + for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { + try { + if (feature.afterDeliver(this, sink, event, json, success)) { + return true; + } + } catch (Exception e) { + logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(), + e.getMessage(), e); + } + } + + return success; + + } + + @Override + public Object[] getRecentSourceEvents() { + return new String[0]; + } + + @Override + public PolicyContainer getContainer() { + return null; + } + + @Override + public String[] getRecentSinkEvents() { + synchronized (this.recentSinkEvents) { + String[] events = new String[recentSinkEvents.size()]; + return recentSinkEvents.toArray(events); + } + } + + @Override + public boolean ownsCoder(Class<?> coderClass, int modelHash) { + //throw new IllegalStateException(makeInvokeMsg()); + return true; + } + + @Override + public Class<?> fetchModelClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(makeInvokeMsg()); + } + } + + @Override + public boolean isBrained() { + return true; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("NonDroolsPolicyController []"); + return builder.toString(); + } + + @Override + public void updateToVersion(String newGroupId, String newArtifactId, String newVersion, + List<TopicCoderFilterConfiguration> decoderConfigurations, + List<TopicCoderFilterConfiguration> encoderConfigurations) + throws LinkageError { + throw new IllegalStateException(makeInvokeMsg()); + } + + @Override + public Map<String, Integer> factClassNames(String sessionName) { + return new HashMap<>(); + } + + @Override + public long factCount(String sessionName) { + return 0; + } + + @Override + public List<Object> facts(String sessionName, String className, boolean delete) { + return new ArrayList<>(); + } + + @Override + public <T> List<T> facts(@NonNull String sessionName, @NonNull Class<T> clazz) { + return new ArrayList<>(); + } + + @Override + public List<Object> factQuery(String sessionName, String queryName, + String queriedEntity, + boolean delete, Object... queryParams) { + return new ArrayList<>(); + } + + @Override + public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) { + return false; + } + + @Override + public <T> boolean delete(@NonNull T fact) { + return false; + } + + @Override + public <T> boolean delete(@NonNull String sessionName, @NonNull Class<T> fact) { + return false; + } + + @Override + public <T> boolean delete(@NonNull Class<T> fact) { + return false; + } + + private String makeInvokeMsg() { + return this.getClass().getName() + " invoked"; + } + + /** + * remove decoders. + */ + protected void removeDecoders() { + logger.info("REMOVE-DECODERS: {}", this); + + if (this.decoderConfigurations == null) { + return; + } + + + for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) { + String topic = coderConfig.getTopic(); + getCoderManager().removeDecoders(this.getGroupId(), this.getArtifactId(), topic); + } + } + + /** + * remove decoders. + */ + protected void removeEncoders() { + + logger.info("REMOVE-ENCODERS: {}", this); + + if (this.encoderConfigurations == null) { + return; + } + + for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) { + String topic = coderConfig.getTopic(); + getCoderManager().removeEncoders(this.getGroupId(), this.getArtifactId(), topic); + } + } + + /** + * removes this drools controllers and encoders and decoders from operation. + */ + protected void removeCoders() { + logger.info("{}: REMOVE-CODERS", this); + + try { + this.removeDecoders(); + } catch (IllegalArgumentException e) { + logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e); + } + + try { + this.removeEncoders(); + } catch (IllegalArgumentException e) { + logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e); + } + } + + protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties, + List<? extends Topic> topicEntities) { + + List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>(); + + if (topicEntities == null || topicEntities.isEmpty()) { + return topics2DecodedClasses2Filters; + } + + for (Topic topic : topicEntities) { + + // 1. first the topic + + String firstTopic = topic.getTopic(); + + String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic; + + // 2. check if there is a custom decoder for this topic that the user prefers to use + // instead of the ones provided in the platform + + CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix); + + // 3. second the list of classes associated with each topic + + String eventClasses = properties + .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX); + + if (eventClasses == null || eventClasses.isEmpty()) { + logger.warn("There are no event classes for topic {}", firstTopic); + continue; + } + + List<PotentialCoderFilter> classes2Filters = + getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses); + + TopicCoderFilterConfiguration topic2Classes2Filters = + new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder); + topics2DecodedClasses2Filters.add(topic2Classes2Filters); + } + + return topics2DecodedClasses2Filters; + } + + private String getPropertyTopicPrefix(Topic topic) { + boolean isSource = topic instanceof TopicSource; + CommInfrastructure commInfra = topic.getTopicCommInfrastructure(); + if (commInfra == CommInfrastructure.UEB) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.DMAAP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.NOOP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; + } + } else { + throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); + } + } + + private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) { + String customGson = properties.getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX); + + CustomGsonCoder customGsonCoder = null; + if (customGson != null && !customGson.isEmpty()) { + try { + customGsonCoder = new CustomGsonCoder(customGson); + } catch (IllegalArgumentException e) { + logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson, + e.getMessage(), e); + } + } + return customGsonCoder; + } + + private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix, + String eventClasses) { + + List<PotentialCoderFilter> classes2Filters = new ArrayList<>(); + + List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*"))); + + for (String theClass : topicClasses) { + + // 4. for each coder class, get the filter expression + + String filter = properties + .getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX + + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX); + + JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter); + PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter); + classes2Filters.add(class2Filters); + } + + return classes2Filters; + } + + // these may be overridden by junit tests + + protected EventProtocolCoder getCoderManager() { + return EventProtocolCoderConstants.getManager(); + } + + protected OrderedServiceImpl<DroolsControllerFeatureApi> getDroolsProviders() { + return DroolsControllerFeatureApiConstants.getProviders(); + } +} |