aboutsummaryrefslogtreecommitdiffstats
path: root/controlloop/common/eventmanager/src
diff options
context:
space:
mode:
Diffstat (limited to 'controlloop/common/eventmanager/src')
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java34
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java297
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java253
-rw-r--r--controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java72
-rw-r--r--controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java295
-rw-r--r--controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java380
6 files changed, 1331 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java
new file mode 100644
index 000000000..2591a3fa0
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+public class ActorConstants {
+ public static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
+ public static final String LOCK_ACTOR = "LOCK";
+ public static final String LOCK_OPERATION = "Lock";
+
+ public static final String PAYLOAD_KEY_VF_COUNT = "vfCount";
+
+
+ private ActorConstants() {
+ // do nothing
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java
new file mode 100644
index 000000000..5cf087abd
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java
@@ -0,0 +1,297 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.ToString;
+import org.onap.policy.controlloop.ControlLoopException;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
+import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager;
+import org.onap.policy.controlloop.processor.ControlLoopProcessor;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for a single event. Once this has been created, the event can be retracted from
+ * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e.,
+ * hasn't been replicated from another server). When the manager is no longer needed,
+ * {@link #destroy()} should be invoked.
+ */
+@ToString(onlyExplicitlyIncluded = true)
+public class ControlLoopEventManager implements StepContext, Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class);
+ private static final long serialVersionUID = -1216568161322872641L;
+
+ private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager";
+
+ /**
+ * Counts the number of these objects that have been created. This is used by junit
+ * tests.
+ */
+ private static final AtomicLong createCount = new AtomicLong(0);
+
+ /**
+ * {@code True} if this object was created by this JVM instance, {@code false}
+ * otherwise. This will be {@code false} if this object is reconstituted from a
+ * persistent store or by transfer from another server.
+ */
+ private transient boolean createdByThisJvmInstance;
+
+ @Getter
+ @ToString.Include
+ public final String closedLoopControlName;
+ @Getter
+ @ToString.Include
+ private final UUID requestId;
+
+ /**
+ * Time, in milliseconds, when the control loop will time out.
+ */
+ @Getter
+ private final long endTimeMs;
+
+ // fields extracted from the ControlLoopParams
+ @Getter
+ private final String policyName;
+ @Getter
+ private final String policyVersion;
+
+ /**
+ * Maps a target entity to its lock.
+ */
+ private final transient Map<String, LockData> target2lock = new HashMap<>();
+
+ @Getter(AccessLevel.PROTECTED)
+ private final ControlLoopProcessor processor;
+
+ /**
+ * Set of properties used while processing the event.
+ */
+ private Map<String, Serializable> properties = new ConcurrentHashMap<>();
+
+ /**
+ * Unprocessed outcomes from the operations. Outcomes are added to this each time the
+ * "start" or "complete" callback is invoked, typically by an operation running in a
+ * background thread, thus it must be thread safe.
+ */
+ @Getter
+ private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param params control loop parameters
+ * @param requestId event request ID
+ * @throws ControlLoopException if the event is invalid or if a YAML processor cannot
+ * be created
+ */
+ public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException {
+
+ createCount.incrementAndGet();
+
+ this.createdByThisJvmInstance = true;
+ this.closedLoopControlName = params.getClosedLoopControlName();
+ this.requestId = requestId;
+ this.policyName = params.getPolicyName();
+ this.policyVersion = params.getPolicyVersion();
+ this.processor = new ControlLoopProcessor(params.getToscaPolicy());
+ this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs();
+ }
+
+ /**
+ * Gets the number of manager objects that have been created.
+ *
+ * @return the number of manager objects that have been created
+ */
+ public static long getCreateCount() {
+ return createCount.get();
+ }
+
+ /**
+ * Determines if the manager is still active.
+ *
+ * @return {@code true} if the manager is still active, {@code false} otherwise
+ */
+ public boolean isActive() {
+ return createdByThisJvmInstance;
+ }
+
+ /**
+ * Cancels the current operation and frees all locks.
+ */
+ public void destroy() {
+ if (isActive()) {
+ getBlockingExecutor().execute(this::freeAllLocks);
+ }
+ }
+
+ /**
+ * Frees all locks.
+ */
+ private void freeAllLocks() {
+ target2lock.values().forEach(LockData::free);
+ }
+
+ /**
+ * Determines the overall control loop timeout.
+ *
+ * @return the policy timeout, in milliseconds, if specified, a default timeout
+ * otherwise
+ */
+ private long detmControlLoopTimeoutMs() {
+ // validation checks preclude null or 0 timeout values in the policy
+ Integer timeout = processor.getControlLoop().getTimeout();
+ return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Requests a lock. This requests the lock for the time that remains before the
+ * timeout expires. This avoids having to extend the lock.
+ *
+ * @param targetEntity entity to be locked
+ * @return a future that can be used to await the lock
+ */
+ @Override
+ public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) {
+
+ long remainingMs = endTimeMs - System.currentTimeMillis();
+ int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS));
+
+ LockData data = target2lock.computeIfAbsent(targetEntity, key -> {
+ LockData data2 = new LockData(key, requestId);
+ makeLock(targetEntity, requestId.toString(), remainingSec, data2);
+
+ data2.addUnavailableCallback(this::onComplete);
+
+ return data2;
+ });
+
+ return data.getFuture();
+ }
+
+ public void onStart(OperationOutcome outcome) {
+ outcomes.add(outcome);
+ }
+
+ public void onComplete(OperationOutcome outcome) {
+ outcomes.add(outcome);
+ }
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name) {
+ return properties.containsKey(name);
+ }
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value) {
+ logger.error("set property {}={} manager={}", name, value, this);
+ properties.put(name, value);
+ }
+
+ /**
+ * Removes a property.
+ *
+ * @param name property name
+ */
+ public void removeProperty(String name) {
+ properties.remove(name);
+ }
+
+ /**
+ * Initializes various components, on demand.
+ */
+ private static class LazyInitData {
+ private static final OperationHistoryDataManager DATA_MANAGER;
+ private static final ActorService ACTOR_SERVICE;
+
+ static {
+ // TODO how to dynamically change data manager, depending whether or not
+ // guards are enabled?
+ EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG);
+ ACTOR_SERVICE = services.getActorService();
+ DATA_MANAGER = services.getDataManager();
+ }
+ }
+
+ // the following methods may be overridden by junit tests
+
+ public Executor getExecutor() {
+ return ForkJoinPool.commonPool();
+ }
+
+ protected ExecutorService getBlockingExecutor() {
+ return PolicyEngineConstants.getManager().getExecutorService();
+ }
+
+ protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
+ PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false);
+ }
+
+ public ActorService getActorService() {
+ return LazyInitData.ACTOR_SERVICE;
+ }
+
+ public OperationHistoryDataManager getDataManager() {
+ return LazyInitData.DATA_MANAGER;
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java
new file mode 100644
index 000000000..01c64e5bc
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java
@@ -0,0 +1,253 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single step within a single policy. The rules typically break a policy operation down
+ * into separate steps. For instance, a policy for VF-Module-Create would be broken down
+ * into lock target, A&AI tenant query, A&AI custom query, guard, and finally
+ * VF-Module-Create.
+ */
+public class Step {
+ private static final Logger logger = LoggerFactory.getLogger(Step.class);
+
+ @Getter
+ protected ControlLoopOperationParams params;
+
+ /**
+ * Time when the first step started for the current policy. This is shared by all
+ * steps for the policy. When a step is started and finds this to be {@code null}, it
+ * sets the value. Subsequent steps leave it unchanged.
+ */
+ private final AtomicReference<Instant> startTime;
+
+ /**
+ * {@code True} if this step is for the policy's actual operation, {@code false} if it's a preprocessor step.
+ */
+ @Getter
+ private final boolean policyStep;
+
+ /**
+ * The operation for this step.
+ */
+ @Getter
+ private Operation operation = null;
+
+ /**
+ * Used to cancel the running operation.
+ */
+ protected CompletableFuture<OperationOutcome> future;
+
+
+ /**
+ * Constructs the object. This is used when constructing the step for the policy's actual operation.
+ *
+ * @param params operation parameters
+ * @param startTime start time of the first step for the current policy, initially
+ * containing {@code null}
+ */
+ public Step(ControlLoopOperationParams params, @NonNull AtomicReference<Instant> startTime) {
+ this.params = params;
+ this.startTime = startTime;
+ this.policyStep = true;
+ }
+
+ /**
+ * Constructs the object using information from another step. This is used when constructing a preprocessing
+ * step.
+ *
+ * @param otherStep step whose information should be used
+ * @param actor actor name
+ * @param operation operation name
+ */
+ public Step(Step otherStep, String actor, String operation) {
+ this.params = otherStep.params.toBuilder().actor(actor).operation(operation).retry(null).timeoutSec(null)
+ .payload(new LinkedHashMap<>()).build();
+ this.startTime = otherStep.startTime;
+ this.policyStep = false;
+ }
+
+ public String getActorName() {
+ return params.getActor();
+ }
+
+ public String getOperationName() {
+ return params.getOperation();
+ }
+
+ /**
+ * Determines if the operation has been initialized (i.e., created).
+ *
+ * @return {@code true} if the operation has been initialized, {@code false} otherwise
+ */
+ public boolean isInitialized() {
+ return (operation != null);
+ }
+
+ /**
+ * Initializes the step, creating the operation if it has not yet been created.
+ */
+ public void init() {
+ if (operation == null) {
+ operation = buildOperation();
+ }
+ }
+
+ /**
+ * Starts the operation.
+ *
+ * @param remainingMs time remaining, in milliseconds, for the control loop
+ * @return {@code true} if started, {@code false} if the step is no longer necessary
+ * (i.e., because it was previously completed)
+ */
+ public boolean start(long remainingMs) {
+ if (!isInitialized()) {
+ throw new IllegalStateException("step has not been initialized");
+ }
+
+ if (future != null) {
+ throw new IllegalStateException("step is already running");
+ }
+
+ try {
+ initStartTime();
+ future = operation.start();
+
+ // handle any exceptions that may be thrown, set timeout, and handle timeout
+
+ // @formatter:off
+ future.exceptionally(this::handleException)
+ .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
+ .exceptionally(this::handleTimeout);
+ // @formatter:on
+
+ } catch (RuntimeException e) {
+ handleException(e);
+ }
+
+ return true;
+ }
+
+ /**
+ * Handles exceptions that may be generated.
+ *
+ * @param thrown exception that was generated
+ * @return {@code null}
+ */
+ private OperationOutcome handleException(Throwable thrown) {
+ if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+ return null;
+ }
+
+ logger.warn("{}.{}: exception starting operation for {}", params.getActor(), params.getOperation(),
+ params.getRequestId(), thrown);
+ OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+ outcome.setStart(startTime.get());
+ outcome.setEnd(Instant.now());
+ outcome.setFinalOutcome(true);
+ params.getCompleteCallback().accept(outcome);
+
+ // this outcome is not used so just return "null"
+ return null;
+ }
+
+ /**
+ * Handles control loop timeout exception.
+ *
+ * @param thrown exception that was generated
+ * @return {@code null}
+ */
+ private OperationOutcome handleTimeout(Throwable thrown) {
+ logger.warn("{}.{}: control loop timeout for {}", params.getActor(), params.getOperation(),
+ params.getRequestId(), thrown);
+
+ OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown);
+ outcome.setActor(ActorConstants.CL_TIMEOUT_ACTOR);
+ outcome.setOperation(null);
+ outcome.setStart(startTime.get());
+ outcome.setEnd(Instant.now());
+ outcome.setFinalOutcome(true);
+ params.getCompleteCallback().accept(outcome);
+
+ // cancel the operation, if it's still running
+ future.cancel(false);
+
+ // this outcome is not used so just return "null"
+ return null;
+ }
+
+ /**
+ * Cancels the operation, if it's running.
+ */
+ public void cancel() {
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+
+ /**
+ * Initializes the start time, if it's still unset.
+ */
+ private void initStartTime() {
+ if (startTime.get() == null) {
+ startTime.set(Instant.now());
+ }
+ }
+
+ /**
+ * Gets the start time.
+ *
+ * @return the start time, or {@code null} if it hasn't been initialized yet
+ */
+ public Instant getStartTime() {
+ return startTime.get();
+ }
+
+ /**
+ * Builds the operation. The default method simply invokes
+ * {@link ControlLoopOperationParams#build()}.
+ *
+ * @return a new operation
+ */
+ protected Operation buildOperation() {
+ return params.build();
+ }
+
+ @Override
+ public String toString() {
+ return "Step(actor=" + getActorName() + ", operation=" + getOperationName() + ")";
+ }
+}
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java
new file mode 100644
index 000000000..5251b7acc
--- /dev/null
+++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java
@@ -0,0 +1,72 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+
+/**
+ * Context used by steps to perform their work.
+ */
+public interface StepContext {
+
+ /**
+ * Determines if the context contains a property.
+ *
+ * @param name name of the property of interest
+ * @return {@code true} if the context contains the property, {@code false} otherwise
+ */
+ public boolean contains(String name);
+
+ /**
+ * Gets a property, casting it to the desired type.
+ *
+ * @param <T> desired type
+ * @param name name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not yet have a value
+ */
+ public <T> T getProperty(String name);
+
+ /**
+ * Sets a property's value.
+ *
+ * @param name property name
+ * @param value new property value
+ */
+ public void setProperty(String name, Serializable value);
+
+ /**
+ * Removes a property.
+ *
+ * @param name property name
+ */
+ public void removeProperty(String name);
+
+ /**
+ * Requests a lock. This requests the lock for the time that remains before the
+ * timeout expires. This avoids having to extend the lock.
+ *
+ * @param targetEntity entity to be locked
+ * @return a future that can be used to await the lock
+ */
+ public CompletableFuture<OperationOutcome> requestLock(String targetEntity);
+}
diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java
new file mode 100644
index 000000000..91434d7ea
--- /dev/null
+++ b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java
@@ -0,0 +1,295 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardYamlCoder;
+import org.onap.policy.common.utils.io.Serializer;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.controlloop.ControlLoopException;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.drl.legacy.ControlLoopParams;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.onap.policy.controlloop.policy.Target;
+import org.onap.policy.controlloop.policy.TargetType;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.LockImpl;
+import org.onap.policy.drools.core.lock.LockState;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
+
+public class ControlLoopEventManagerTest {
+ private static final UUID REQ_ID = UUID.randomUUID();
+ private static final String CL_NAME = "my-closed-loop-name";
+ private static final String POLICY_NAME = "my-policy-name";
+ private static final String POLICY_SCOPE = "my-scope";
+ private static final String POLICY_VERSION = "1.2.3";
+ private static final String LOCK1 = "my-lock-A";
+ private static final String LOCK2 = "my-lock-B";
+ private static final Coder yamlCoder = new StandardYamlCoder();
+ private static final String MY_KEY = "def";
+
+ @Mock
+ private ExecutorService executor;
+
+ private long preCreateTimeMs;
+ private List<LockImpl> locks;
+ private Target target;
+ private ToscaPolicy tosca;
+ private ControlLoopParams params;
+ private ControlLoopEventManager mgr;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() throws ControlLoopException, CoderException {
+ MockitoAnnotations.initMocks(this);
+
+ target = new Target();
+ target.setType(TargetType.VNF);
+
+ params = new ControlLoopParams();
+ params.setClosedLoopControlName(CL_NAME);
+ params.setPolicyName(POLICY_NAME);
+ params.setPolicyScope(POLICY_SCOPE);
+ params.setPolicyVersion(POLICY_VERSION);
+
+ loadPolicy("eventManager/event-mgr-simple.yaml");
+
+ locks = new ArrayList<>();
+
+ preCreateTimeMs = System.currentTimeMillis();
+
+ MyManager.executor = executor;
+ MyManager.locks = locks;
+
+ mgr = new MyManager(params, REQ_ID);
+ }
+
+ @Test
+ public void testConstructor() {
+ assertEquals(POLICY_NAME, mgr.getPolicyName());
+
+ assertTrue(mgr.isActive());
+ assertEquals(CL_NAME, mgr.getClosedLoopControlName());
+ assertSame(REQ_ID, mgr.getRequestId());
+ assertEquals(POLICY_NAME, mgr.getPolicyName());
+ assertEquals(POLICY_VERSION, mgr.getPolicyVersion());
+ assertNotNull(mgr.getProcessor());
+ assertThat(mgr.getEndTimeMs()).isGreaterThanOrEqualTo(preCreateTimeMs);
+ }
+
+ @Test
+ public void testGetCreateCount() throws ControlLoopException {
+ long original = ControlLoopEventManager.getCreateCount();
+
+ new MyManager(params, REQ_ID);
+ assertEquals(original + 1, ControlLoopEventManager.getCreateCount());
+
+ new MyManager(params, REQ_ID);
+ assertEquals(original + 2, ControlLoopEventManager.getCreateCount());
+ }
+
+ @Test
+ public void testIsActive() throws Exception {
+ mgr = new ControlLoopEventManager(params, REQ_ID);
+ assertTrue(mgr.isActive());
+
+ ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr);
+ assertFalse(mgr2.isActive());
+ }
+
+ @Test
+ public void testDestroy() throws IOException {
+ mgr.requestLock(LOCK1);
+ mgr.requestLock(LOCK2);
+ mgr.requestLock(LOCK1);
+
+ // ensure destroy() doesn't throw an exception if the object is deserialized
+ ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr);
+ assertThatCode(() -> mgr2.destroy()).doesNotThrowAnyException();
+
+ // locks should not have been freed
+ for (LockImpl lock : locks) {
+ assertFalse(lock.isUnavailable());
+ }
+
+ mgr.destroy();
+
+ runExecutor();
+
+ for (LockImpl lock : locks) {
+ assertTrue(lock.isUnavailable());
+ }
+ }
+
+ @Test
+ public void testDetmControlLoopTimeoutMs() throws Exception {
+ long timeMs = 1200 * 1000L;
+ long end = mgr.getEndTimeMs();
+ assertThat(end).isGreaterThanOrEqualTo(preCreateTimeMs + timeMs);
+ assertThat(end).isLessThan(preCreateTimeMs + timeMs + 5000);
+ }
+
+ @Test
+ public void testRequestLock() {
+ final CompletableFuture<OperationOutcome> future1 = mgr.requestLock(LOCK1);
+ assertTrue(mgr.getOutcomes().isEmpty());
+
+ final CompletableFuture<OperationOutcome> future2 = mgr.requestLock(LOCK2);
+ assertTrue(mgr.getOutcomes().isEmpty());
+
+ assertSame(future1, mgr.requestLock(LOCK1));
+ assertTrue(mgr.getOutcomes().isEmpty());
+
+ assertEquals(2, locks.size());
+
+ assertTrue(future1.isDone());
+ assertTrue(future2.isDone());
+
+ // indicate that the first lock failed
+ locks.get(0).notifyUnavailable();
+
+ verifyLock(PolicyResult.FAILURE);
+ assertTrue(mgr.getOutcomes().isEmpty());
+ }
+
+ private void verifyLock(PolicyResult result) {
+ OperationOutcome outcome = mgr.getOutcomes().poll();
+ assertNotNull(outcome);
+ assertEquals(ActorConstants.LOCK_ACTOR, outcome.getActor());
+ assertEquals(ActorConstants.LOCK_OPERATION, outcome.getOperation());
+ assertNotNull(outcome.getEnd());
+ assertTrue(outcome.isFinalOutcome());
+ assertEquals(result, outcome.getResult());
+ }
+
+ @Test
+ public void testOnStart() {
+ OperationOutcome outcome1 = new OperationOutcome();
+ OperationOutcome outcome2 = new OperationOutcome();
+
+ mgr.onStart(outcome1);
+ mgr.onStart(outcome2);
+
+ assertSame(outcome1, mgr.getOutcomes().poll());
+ assertSame(outcome2, mgr.getOutcomes().poll());
+ assertTrue(mgr.getOutcomes().isEmpty());
+ }
+
+ @Test
+ public void testOnComplete() {
+ OperationOutcome outcome1 = new OperationOutcome();
+ OperationOutcome outcome2 = new OperationOutcome();
+
+ mgr.onComplete(outcome1);
+ mgr.onComplete(outcome2);
+
+ assertSame(outcome1, mgr.getOutcomes().poll());
+ assertSame(outcome2, mgr.getOutcomes().poll());
+ assertTrue(mgr.getOutcomes().isEmpty());
+ }
+
+ @Test
+ public void testContains_testGetProperty_testSetProperty_testRemoveProperty() {
+ mgr.setProperty("abc", "a string");
+ mgr.setProperty(MY_KEY, 100);
+
+ assertTrue(mgr.contains(MY_KEY));
+ assertFalse(mgr.contains("ghi"));
+
+ String strValue = mgr.getProperty("abc");
+ assertEquals("a string", strValue);
+
+ int intValue = mgr.getProperty(MY_KEY);
+ assertEquals(100, intValue);
+
+ mgr.removeProperty(MY_KEY);
+ assertFalse(mgr.contains(MY_KEY));
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(mgr.toString());
+ }
+
+ private void loadPolicy(String fileName) throws CoderException {
+ ToscaServiceTemplate template =
+ yamlCoder.decode(ResourceUtils.getResourceAsString(fileName), ToscaServiceTemplate.class);
+ tosca = template.getToscaTopologyTemplate().getPolicies().get(0).values().iterator().next();
+
+ params.setToscaPolicy(tosca);
+ }
+
+ private void runExecutor() {
+ ArgumentCaptor<Runnable> runCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(executor).execute(runCaptor.capture());
+
+ runCaptor.getValue().run();
+ }
+
+
+ private static class MyManager extends ControlLoopEventManager {
+ private static final long serialVersionUID = 1L;
+
+ private static ExecutorService executor;
+ private static List<LockImpl> locks;
+
+ public MyManager(ControlLoopParams params, UUID requestId)
+ throws ControlLoopException {
+ super(params, requestId);
+ }
+
+ @Override
+ protected ExecutorService getBlockingExecutor() {
+ return executor;
+ }
+
+ @Override
+ protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) {
+ LockImpl lock = new LockImpl(LockState.ACTIVE, targetEntity, requestId, holdSec, callback);
+ locks.add(lock);
+ callback.lockAvailable(lock);
+ }
+ }
+}
diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java
new file mode 100644
index 000000000..1472adc18
--- /dev/null
+++ b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java
@@ -0,0 +1,380 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.eventmanager;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
+import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.onap.policy.controlloop.policy.Policy;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.onap.policy.controlloop.policy.Target;
+import org.onap.policy.controlloop.policy.TargetType;
+
+public class StepTest {
+ private static final UUID REQ_ID = UUID.randomUUID();
+ private static final String POLICY_ID = "my-policy";
+ private static final String POLICY_ACTOR = "my-actor";
+ private static final String POLICY_OPERATION = "my-operation";
+ private static final String MY_TARGET = "my-target";
+ private static final String PAYLOAD_KEY = "payload-key";
+ private static final String PAYLOAD_VALUE = "payload-value";
+ private static final long REMAINING_MS = 5000;
+ private static final Integer POLICY_RETRY = 3;
+ private static final Integer POLICY_TIMEOUT = 20;
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+
+ @Mock
+ private Operator policyOperator;
+ @Mock
+ private Operation policyOperation;
+ @Mock
+ private Actor policyActor;
+ @Mock
+ private ActorService actors;
+
+ private CompletableFuture<OperationOutcome> future;
+ private Target target;
+ private Map<String, String> payload;
+ private Policy policy;
+ private VirtualControlLoopEvent event;
+ private ControlLoopEventContext context;
+ private BlockingQueue<OperationOutcome> starts;
+ private BlockingQueue<OperationOutcome> completions;
+ private ControlLoopOperationParams params;
+ private AtomicReference<Instant> startTime;
+ private Step step;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ future = new CompletableFuture<>();
+
+ // configure policy operation
+ when(actors.getActor(POLICY_ACTOR)).thenReturn(policyActor);
+ when(policyActor.getOperator(POLICY_OPERATION)).thenReturn(policyOperator);
+ when(policyOperator.buildOperation(any())).thenReturn(policyOperation);
+ when(policyOperation.start()).thenReturn(future);
+
+ target = new Target();
+ target.setType(TargetType.VM);
+
+ payload = Map.of(PAYLOAD_KEY, PAYLOAD_VALUE);
+
+ policy = new Policy();
+ policy.setId(POLICY_ID);
+ policy.setActor(POLICY_ACTOR);
+ policy.setRecipe(POLICY_OPERATION);
+ policy.setTarget(target);
+ policy.setPayload(payload);
+ policy.setRetry(POLICY_RETRY);
+ policy.setTimeout(POLICY_TIMEOUT);
+
+ event = new VirtualControlLoopEvent();
+ event.setRequestId(REQ_ID);
+ event.setTarget(ControlLoopOperationManager2.VSERVER_VSERVER_NAME);
+ event.setAai(new TreeMap<>(Map.of(ControlLoopOperationManager2.VSERVER_VSERVER_NAME, MY_TARGET)));
+
+ context = new ControlLoopEventContext(event);
+
+ starts = new LinkedBlockingQueue<>();
+ completions = new LinkedBlockingQueue<>();
+
+ params = ControlLoopOperationParams.builder().actor(POLICY_ACTOR).actorService(actors)
+ .completeCallback(completions::add).context(context).executor(ForkJoinPool.commonPool())
+ .operation(POLICY_OPERATION).payload(new TreeMap<>(payload)).startCallback(starts::add)
+ .target(target).targetEntity(MY_TARGET).build();
+
+ startTime = new AtomicReference<>();
+
+ step = new Step(params, startTime);
+ }
+
+ @Test
+ public void testConstructor() {
+ assertTrue(step.isPolicyStep());
+ assertSame(params, step.getParams());
+
+ // check that it recorded the startTime by starting and checking it
+ step.init();
+ step.start(REMAINING_MS);
+
+ assertNotNull(startTime.get());
+
+ // try with null start time
+ assertThatThrownBy(() -> new Step(params, null)).isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("startTime");
+ }
+
+ @Test
+ public void testConstructorWithOtherStep_testInitStartTime_testGetStartTimeRef() {
+ Step step2 = new Step(step, "actorB", "operB");
+ assertFalse(step2.isPolicyStep());
+
+ ControlLoopOperationParams params2 = step2.getParams();
+ assertEquals("actorB", params2.getActor());
+ assertEquals("operB", params2.getOperation());
+ assertNull(params2.getRetry());
+ assertNull(params2.getTimeoutSec());
+ assertSame(target, params2.getTarget());
+ assertEquals(MY_TARGET, params2.getTargetEntity());
+ assertTrue(params2.getPayload().isEmpty());
+
+ when(actors.getActor(params2.getActor())).thenReturn(policyActor);
+ when(policyActor.getOperator(params2.getOperation())).thenReturn(policyOperator);
+
+ assertNull(step2.getStartTime());
+
+ // check that it recorded the startTime by starting and checking it
+ step2.init();
+ step2.start(REMAINING_MS);
+
+ Instant instant = startTime.get();
+ assertNotNull(instant);
+ assertSame(instant, step2.getStartTime());
+
+ // launch the original step, too, so we can test the other branch of
+ // initStartTime()
+ step.init();
+ step.start(REMAINING_MS);
+
+ assertSame(instant, startTime.get());
+ assertSame(instant, step.getStartTime());
+ }
+
+ @Test
+ public void testGetActorName_testGetOperationName() {
+ assertEquals(POLICY_ACTOR, step.getActorName());
+ assertEquals(POLICY_OPERATION, step.getOperationName());
+ }
+
+ @Test
+ public void testIsInitialized_testInit_testGetOperation() {
+ assertFalse(step.isInitialized());
+
+ // verify it's unchanged
+ assertFalse(step.isInitialized());
+
+ assertNull(step.getOperation());
+
+ step.init();
+
+ assertSame(policyOperation, step.getOperation());
+ assertTrue(step.isInitialized());
+
+ // repeat - should be unchanged
+ step.init();
+ assertSame(policyOperation, step.getOperation());
+ assertTrue(step.isInitialized());
+
+ // repeat without init - should be unchanged
+ assertSame(policyOperation, step.getOperation());
+ assertTrue(step.isInitialized());
+ }
+
+ @Test
+ public void testStart() {
+ assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
+ .withMessage("step has not been initialized");
+
+ // initialize it, by calling getOperation(), and then try again
+ step.init();
+ assertTrue(step.start(REMAINING_MS));
+
+ assertNotNull(startTime.get());
+
+ // should fail if we try again
+ assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
+ .withMessage("step is already running");
+ }
+
+ /**
+ * Tests start() when the operation.start() throws an exception.
+ */
+ @Test
+ public void testStartException() {
+ when(policyOperation.start()).thenThrow(new RuntimeException());
+ step.init();
+
+ assertTrue(step.start(REMAINING_MS));
+
+ // exception should be immediate
+ OperationOutcome outcome = completions.poll();
+ assertNotNull(outcome);
+
+ assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(POLICY_ACTOR, outcome.getActor());
+ assertTrue(outcome.isFinalOutcome());
+ }
+
+ /**
+ * Tests start() when the operation throws an asynchronous exception.
+ */
+ @Test
+ public void testStartAsyncException() {
+ step.init();
+ step.start(REMAINING_MS);
+
+ future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION));
+
+ // exception should be immediate
+ OperationOutcome outcome = completions.poll();
+ assertNotNull(outcome);
+
+ assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(POLICY_ACTOR, outcome.getActor());
+ assertTrue(outcome.isFinalOutcome());
+ }
+
+ /**
+ * Tests handleException() when the exception is a CancellationException.
+ */
+ @Test
+ public void testHandleExceptionCancellationException() {
+ step.init();
+ step.start(REMAINING_MS);
+
+ future.completeExceptionally(new CancellationException(EXPECTED_EXCEPTION));
+
+ // should not have generated an outcome
+ assertNull(completions.peek());
+ }
+
+ @Test
+ public void testHandleExceptionCauseCancellationException() {
+ step.init();
+ step.start(REMAINING_MS);
+
+ future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION, new CancellationException()));
+
+ // should not have generated an outcome
+ assertNull(completions.peek());
+ }
+
+ @Test
+ public void testHandleException() {
+ when(policyOperation.start()).thenThrow(new RuntimeException());
+
+ step.init();
+
+ assertTrue(step.start(REMAINING_MS));
+
+ // exception should be immediate
+ OperationOutcome outcome = completions.poll();
+ assertNotNull(outcome);
+
+ assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(POLICY_ACTOR, outcome.getActor());
+ assertTrue(outcome.isFinalOutcome());
+ assertEquals(POLICY_OPERATION, outcome.getOperation());
+ assertSame(startTime.get(), outcome.getStart());
+ assertNotNull(outcome.getEnd());
+ assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
+ }
+
+ @Test
+ public void testHandleTimeout() throws InterruptedException {
+ step.init();
+
+ long tstart = System.currentTimeMillis();
+
+ // give it a short timeout
+ step.start(100);
+
+ OperationOutcome outcome = completions.poll(5, TimeUnit.SECONDS);
+ assertNotNull(outcome);
+
+ // should not have timed out before 100ms
+ assertTrue(tstart + 100 <= System.currentTimeMillis());
+
+ // must wait for the future to complete before checking that it was cancelled
+ assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)).isInstanceOf(Exception.class);
+
+ // verify that the future was cancelled
+ assertTrue(future.isCancelled());
+
+ assertNotEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(ActorConstants.CL_TIMEOUT_ACTOR, outcome.getActor());
+ assertTrue(outcome.isFinalOutcome());
+ assertNull(outcome.getOperation());
+ assertSame(startTime.get(), outcome.getStart());
+ assertNotNull(outcome.getEnd());
+ assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
+ }
+
+ @Test
+ public void testCancel() {
+ // should have no effect
+ step.cancel();
+
+ step.init();
+
+ step.start(REMAINING_MS);
+ step.cancel();
+
+ assertTrue(future.isCancelled());
+ }
+
+ @Test
+ public void testBuildOperation() {
+ assertSame(policyOperation, step.buildOperation());
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(step.toString());
+ }
+}