summaryrefslogtreecommitdiffstats
path: root/controlloop/common/eventmanager/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-08-20 10:11:54 -0400
committerJim Hahn <jrh3@att.com>2020-08-20 11:00:06 -0400
commit56efff004af2d1be64c67f7c8091cb4553a0e86b (patch)
treee75f3248c9a25f62a5d2a9483616cfafad4927ad /controlloop/common/eventmanager/src/main
parent6656a9fc98c178351122fd326940fbb6923d292f (diff)
Add generic eventmanager classes
Added classes that are event-agnostic and support moving control from java into rules. Updates per review comments: - removed policy scope Issue-ID: POLICY-2748-event-mgr Change-Id: Icf811cc25a3975543fc5c725766b7b9df2bb87b0 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'controlloop/common/eventmanager/src/main')
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java34
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java297
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java253
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java72
4 files changed, 656 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java
new file mode 100644
index 000000000..2591a3fa0
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+public class ActorConstants {
+ public static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
+ public static final String LOCK_ACTOR = "LOCK";
+ public static final String LOCK_OPERATION = "Lock";
+
+ public static final String PAYLOAD_KEY_VF_COUNT = "vfCount";
+
+
+ private ActorConstants() {
+ // do nothing
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java
new file mode 100644
index 000000000..5cf087abd
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java
@@ -0,0 +1,297 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.ToString;
+import org.onap.policy.controlloop.ControlLoopException;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
+import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
+import org.onap.policy.controlloop.processor.ControlLoopProcessor;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for a single event. Once this has been created, the event can be retracted from
+ * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
+ * hasn't been replicated from another server). When the manager is no longer needed,
+ * {@link #destroy()} should be invoked.
+ */
+@ToString(onlyExplicitlyIncluded = true)
+public class ControlLoopEventManager implements StepContext, Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
+ private static final long serialVersionUID = -1216568161322872641L;
+
+ private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
+
+ /**
+ * Counts the number of these objects that have been created. This is used by junit
+ * tests.
+ */
+ private static final AtomicLong createCount = new AtomicLong(0);
+
+ /**
+ * {@code True} if this object was created by this JVM instance, {@code false}
+ * otherwise. This will be {@code false} if this object is reconstituted from a
+ * persistent store or by transfer from another server.
+ */
+ private transient boolean createdByThisJvmInstance;
+
+ @Getter
+ @ToString.Include
+ public final String closedLoopControlName;
+ @Getter
+ @ToString.Include
+ private final UUID requestId;
+
+ /**
+ * Time, in milliseconds, when the control loop will time out.
+ */
+ @Getter
+ private final long endTimeMs;
+
+ // fields extracted from the ControlLoopParams
+ @Getter
+ private final String policyName;
+ @Getter
+ private final String policyVersion;
+
+ /**
+ * Maps a target entity to its lock.
+ */
+ private final transient Map<String, LockData> target2lock = new HashMap<>();
+
+ @Getter(AccessLevel.PROTECTED)
+ private final ControlLoopProcessor processor;
+
+ /**
+ * Set of properties used while processing the event.
+ */
+ private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+ /**
+ * Unprocessed outcomes from the operations. Outcomes are added to this each time the
+ * "start" or "complete" callback is invoked, typically by an operation running in a
+ * background thread, thus it must be thread safe.
+ */
+ @Getter
+ private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param params control loop parameters
+ * @param requestId event request ID
+ * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
+ * be created
+ */
+ public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
+
+ createCount.incrementAndGet();
+
+ this.createdByThisJvmInstance = true;
+ this.closedLoopControlName = params.getClosedLoopControlName();
+ this.requestId = requestId;
+ this.policyName = params.getPolicyName();
+ this.policyVersion = params.getPolicyVersion();
+ this.processor = new ControlLoopProcessor(params.getToscaPolicy());
+ this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
+ }
+
+ /**
+ * Gets the number of manager objects that have been created.
+ *
+ * @return the number of manager objects that have been created
+ */
+ public static long getCreateCount() {
+ return createCount.get();
+ }
+
+ /**
+ * Determines if the manager is still active.
+ *
+ * @return {@code true} if the manager is still active, {@code false} otherwise
+ */
+ public boolean isActive() {
+ return createdByThisJvmInstance;
+ }
+
+ /**
+ * Cancels the current operation and frees all locks.
+ */
+ public void destroy() {
+ if (isActive()) {
+ getBlockingExecutor().execute(this::freeAllLocks);
+ }
+ }
+
+ /**
+ * Frees all locks.
+ */
+ private void freeAllLocks() {
+ target2lock.values().forEach(LockData::free);
+ }
+
+ /**
+ * Determines the overall control loop timeout.
+ *
+ * @return the policy timeout, in milliseconds, if specified, a default timeout
+ * otherwise
+ */
+ private long detmControlLoopTimeoutMs() {
+ // validation checks preclude null or 0 timeout values in the policy
+ Integer timeout = processor.getControlLoop().getTimeout();
+ return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Requests a lock. This requests the lock for the time that remains before the
+ * timeout expires. This avoids having to extend the lock.
+ *
+ * @param targetEntity entity to be locked
+ * @return a future that can be used to await the lock
+ */
+ @Override
+ public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
+
+ long remainingMs = endTimeMs - System.currentTimeMillis();
+ int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
+
+ LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
+ LockData data2 = new LockData(key, requestId);
+ makeLock(targetEntity, requestId.toString(), remainingSec, data2);
+
+ data2.addUnavailableCallback(this::onComplete);
+
+ return data2;
+ });
+
+ return data.getFuture();
+ }
+
+ public void onStart(OperationOutcome outcome) {
+ outcomes.add(outcome);
+ }
+
+ public void onComplete(OperationOutcome outcome) {
+ outcomes.add(outcome);
+ }
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return properties.containsKey(name);
+ }
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value) {
+ logger.error("set property {}={} manager={}", name, value, this);
+ properties.put(name, value);
+ }
+
+ /**
+ * Removes a property.
+ *
+ * @param name property name
+ */
+ public void removeProperty(String name) {
+ properties.remove(name);
+ }
+
+ /**
+ * Initializes various components, on demand.
+ */
+ private static class LazyInitData {
+ private static final OperationHistoryDataManager DATA_MANAGER;
+ private static final ActorService ACTOR_SERVICE;
+
+ static {
+ // TODO how to dynamically change data manager, depending whether or not
+ // guards are enabled?
+ EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
+ ACTOR_SERVICE = services.getActorService();
+ DATA_MANAGER = services.getDataManager();
+ }
+ }
+
+ // the following methods may be overridden by junit tests
+
+ public Executor getExecutor() {
+ return ForkJoinPool.commonPool();
+ }
+
+ protected ExecutorService getBlockingExecutor() {
+ return PolicyEngineConstants.getManager().getExecutorService();
+ }
+
+ protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
+ PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
+ }
+
+ public ActorService getActorService() {
+ return LazyInitData.ACTOR_SERVICE;
+ }
+
+ public OperationHistoryDataManager getDataManager() {
+ return LazyInitData.DATA_MANAGER;
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java
new file mode 100644
index 000000000..01c64e5bc
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java
@@ -0,0 +1,253 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single step within a single policy. The rules typically break a policy operation down
+ * into separate steps. For instance, a policy for VF-Module-Create would be broken down
+ * into lock target, A&AI tenant query, A&AI custom query, guard, and finally
+ * VF-Module-Create.
+ */
+public class Step {
+ private static final Logger logger = LoggerFactory.getLogger(Step.class);
+
+ @Getter
+ protected ControlLoopOperationParams params;
+
+ /**
+ * Time when the first step started for the current policy. This is shared by all
+ * steps for the policy. When a step is started and finds this to be {@code null}, it
+ * sets the value. Subsequent steps leave it unchanged.
+ */
+ private final AtomicReference<Instant> startTime;
+
+ /**
+ * {@code True} if this step is for the policy's actual operation, {@code false} if it's a preprocessor step.
+ */
+ @Getter
+ private final boolean policyStep;
+
+ /**
+ * The operation for this step.
+ */
+ @Getter
+ private Operation operation = null;
+
+ /**
+ * Used to cancel the running operation.
+ */
+ protected CompletableFuture<OperationOutcome> future;
+
+
+ /**
+ * Constructs the object. This is used when constructing the step for the policy's actual operation.
+ *
+ * @param params operation parameters
+ * @param startTime start time of the first step for the current policy, initially
+ * containing {@code null}
+ */
+ public Step(ControlLoopOperationParams params, @NonNull AtomicReference<Instant> startTime) {
+ this.params = params;
+ this.startTime = startTime;
+ this.policyStep = true;
+ }
+
+ /**
+ * Constructs the object using information from another step. This is used when constructing a preprocessing
+ * step.
+ *
+ * @param otherStep step whose information should be used
+ * @param actor actor name
+ * @param operation operation name
+ */
+ public Step(Step otherStep, String actor, String operation) {
+ this.params = otherStep.params.toBuilder().actor(actor).operation(operation).retry(null).timeoutSec(null)
+ .payload(new LinkedHashMap<>()).build();
+ this.startTime = otherStep.startTime;
+ this.policyStep = false;
+ }
+
+ public String getActorName() {
+ return params.getActor();
+ }
+
+ public String getOperationName() {
+ return params.getOperation();
+ }
+
+ /**
+ * Determines if the operation has been initialized (i.e., created).
+ *
+ * @return {@code true} if the operation has been initialized, {@code false} otherwise
+ */
+ public boolean isInitialized() {
+ return (operation != null);
+ }
+
+ /**
+ * Initializes the step, creating the operation if it has not yet been created.
+ */
+ public void init() {
+ if (operation == null) {
+ operation = buildOperation();
+ }
+ }
+
+ /**
+ * Starts the operation.
+ *
+ * @param remainingMs time remaining, in milliseconds, for the control loop
+ * @return {@code true} if started, {@code false} if the step is no longer necessary
+ * (i.e., because it was previously completed)
+ */
+ public boolean start(long remainingMs) {
+ if (!isInitialized()) {
+ throw new IllegalStateException("step has not been initialized");
+ }
+
+ if (future != null) {
+ throw new IllegalStateException("step is already running");
+ }
+
+ try {
+ initStartTime();
+ future = operation.start();
+
+ // handle any exceptions that may be thrown, set timeout, and handle timeout
+
+ // @formatter:off
+ future.exceptionally(this::handleException)
+ .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
+ .exceptionally(this::handleTimeout);
+ // @formatter:on
+
+ } catch (RuntimeException e) {
+ handleException(e);
+ }
+
+ return true;
+ }
+
+ /**
+ * Handles exceptions that may be generated.
+ *
+ * @param thrown exception that was generated
+ * @return {@code null}
+ */
+ private OperationOutcome handleException(Throwable thrown) {
+ if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+ return null;
+ }
+
+ logger.warn("{}.{}: exception starting operation for {}", params.getActor(), params.getOperation(),
+ params.getRequestId(), thrown);
+ OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+ outcome.setStart(startTime.get());
+ outcome.setEnd(Instant.now());
+ outcome.setFinalOutcome(true);
+ params.getCompleteCallback().accept(outcome);
+
+ // this outcome is not used so just return "null"
+ return null;
+ }
+
+ /**
+ * Handles control loop timeout exception.
+ *
+ * @param thrown exception that was generated
+ * @return {@code null}
+ */
+ private OperationOutcome handleTimeout(Throwable thrown) {
+ logger.warn("{}.{}: control loop timeout for {}", params.getActor(), params.getOperation(),
+ params.getRequestId(), thrown);
+
+ OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+ outcome.setActor(ActorConstants.CL_TIMEOUT_ACTOR);
+ outcome.setOperation(null);
+ outcome.setStart(startTime.get());
+ outcome.setEnd(Instant.now());
+ outcome.setFinalOutcome(true);
+ params.getCompleteCallback().accept(outcome);
+
+ // cancel the operation, if it's still running
+ future.cancel(false);
+
+ // this outcome is not used so just return "null"
+ return null;
+ }
+
+ /**
+ * Cancels the operation, if it's running.
+ */
+ public void cancel() {
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+
+ /**
+ * Initializes the start time, if it's still unset.
+ */
+ private void initStartTime() {
+ if (startTime.get() == null) {
+ startTime.set(Instant.now());
+ }
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if it hasn't been initialized yet
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Builds the operation. The default method simply invokes
+ * {@link ControlLoopOperationParams#build()}.
+ *
+ * @return a new operation
+ */
+ protected Operation buildOperation() {
+ return params.build();
+ }
+
+ @Override
+ public String toString() {
+ return "Step(actor=" + getActorName() + ", operation=" + getOperationName() + ")";
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java
new file mode 100644
index 000000000..5251b7acc
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java
@@ -0,0 +1,72 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+
+/**
+ * Context used by steps to perform their work.
+ */
+public interface StepContext {
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name);
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ public <T> T getProperty(String name);
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value);
+
+ /**
+ * Removes a property.
+ *
+ * @param name property name
+ */
+ public void removeProperty(String name);
+
+ /**
+ * Requests a lock. This requests the lock for the time that remains before the
+ * timeout expires. This avoids having to extend the lock.
+ *
+ * @param targetEntity entity to be locked
+ * @return a future that can be used to await the lock
+ */
+ public CompletableFuture<OperationOutcome> requestLock(String targetEntity);
+}