diff options
author | Jim Hahn <jrh3@att.com> | 2020-08-20 10:11:54 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-08-20 11:00:06 -0400 |
commit | 56efff004af2d1be64c67f7c8091cb4553a0e86b (patch) | |
tree | e75f3248c9a25f62a5d2a9483616cfafad4927ad /controlloop/common/eventmanager | |
parent | 6656a9fc98c178351122fd326940fbb6923d292f (diff) |
Add generic eventmanager classes
Added classes that are event-agnostic and support moving control from
java into rules.
Updates per review comments:
- removed policy scope
Issue-ID: POLICY-2748-event-mgr
Change-Id: Icf811cc25a3975543fc5c725766b7b9df2bb87b0
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'controlloop/common/eventmanager')
6 files changed, 1331 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java new file mode 100644 index 000000000..2591a3fa0 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +public class ActorConstants { + public static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-"; + public static final String LOCK_ACTOR = "LOCK"; + public static final String LOCK_OPERATION = "Lock"; + + public static final String PAYLOAD_KEY_VF_COUNT = "vfCount"; + + + private ActorConstants() { + // do nothing + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java new file mode 100644 index 000000000..5cf087abd --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java @@ -0,0 +1,297 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.ToString; +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager; +import org.onap.policy.controlloop.processor.ControlLoopProcessor; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for a single event. Once this has been created, the event can be retracted from + * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e., + * hasn't been replicated from another server). When the manager is no longer needed, + * {@link #destroy()} should be invoked. + */ +@ToString(onlyExplicitlyIncluded = true) +public class ControlLoopEventManager implements StepContext, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class); + private static final long serialVersionUID = -1216568161322872641L; + + private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager"; + + /** + * Counts the number of these objects that have been created. This is used by junit + * tests. + */ + private static final AtomicLong createCount = new AtomicLong(0); + + /** + * {@code True} if this object was created by this JVM instance, {@code false} + * otherwise. This will be {@code false} if this object is reconstituted from a + * persistent store or by transfer from another server. + */ + private transient boolean createdByThisJvmInstance; + + @Getter + @ToString.Include + public final String closedLoopControlName; + @Getter + @ToString.Include + private final UUID requestId; + + /** + * Time, in milliseconds, when the control loop will time out. + */ + @Getter + private final long endTimeMs; + + // fields extracted from the ControlLoopParams + @Getter + private final String policyName; + @Getter + private final String policyVersion; + + /** + * Maps a target entity to its lock. + */ + private final transient Map<String, LockData> target2lock = new HashMap<>(); + + @Getter(AccessLevel.PROTECTED) + private final ControlLoopProcessor processor; + + /** + * Set of properties used while processing the event. + */ + private Map<String, Serializable> properties = new ConcurrentHashMap<>(); + + /** + * Unprocessed outcomes from the operations. Outcomes are added to this each time the + * "start" or "complete" callback is invoked, typically by an operation running in a + * background thread, thus it must be thread safe. + */ + @Getter + private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>(); + + + /** + * Constructs the object. + * + * @param params control loop parameters + * @param requestId event request ID + * @throws ControlLoopException if the event is invalid or if a YAML processor cannot + * be created + */ + public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException { + + createCount.incrementAndGet(); + + this.createdByThisJvmInstance = true; + this.closedLoopControlName = params.getClosedLoopControlName(); + this.requestId = requestId; + this.policyName = params.getPolicyName(); + this.policyVersion = params.getPolicyVersion(); + this.processor = new ControlLoopProcessor(params.getToscaPolicy()); + this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs(); + } + + /** + * Gets the number of manager objects that have been created. + * + * @return the number of manager objects that have been created + */ + public static long getCreateCount() { + return createCount.get(); + } + + /** + * Determines if the manager is still active. + * + * @return {@code true} if the manager is still active, {@code false} otherwise + */ + public boolean isActive() { + return createdByThisJvmInstance; + } + + /** + * Cancels the current operation and frees all locks. + */ + public void destroy() { + if (isActive()) { + getBlockingExecutor().execute(this::freeAllLocks); + } + } + + /** + * Frees all locks. + */ + private void freeAllLocks() { + target2lock.values().forEach(LockData::free); + } + + /** + * Determines the overall control loop timeout. + * + * @return the policy timeout, in milliseconds, if specified, a default timeout + * otherwise + */ + private long detmControlLoopTimeoutMs() { + // validation checks preclude null or 0 timeout values in the policy + Integer timeout = processor.getControlLoop().getTimeout(); + return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS); + } + + /** + * Requests a lock. This requests the lock for the time that remains before the + * timeout expires. This avoids having to extend the lock. + * + * @param targetEntity entity to be locked + * @return a future that can be used to await the lock + */ + @Override + public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) { + + long remainingMs = endTimeMs - System.currentTimeMillis(); + int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS)); + + LockData data = target2lock.computeIfAbsent(targetEntity, key -> { + LockData data2 = new LockData(key, requestId); + makeLock(targetEntity, requestId.toString(), remainingSec, data2); + + data2.addUnavailableCallback(this::onComplete); + + return data2; + }); + + return data.getFuture(); + } + + public void onStart(OperationOutcome outcome) { + outcomes.add(outcome); + } + + public void onComplete(OperationOutcome outcome) { + outcomes.add(outcome); + } + + /** + * Determines if the context contains a property. + * + * @param name name of the property of interest + * @return {@code true} if the context contains the property, {@code false} otherwise + */ + public boolean contains(String name) { + return properties.containsKey(name); + } + + /** + * Gets a property, casting it to the desired type. + * + * @param <T> desired type + * @param name name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not yet have a value + */ + @SuppressWarnings("unchecked") + public <T> T getProperty(String name) { + return (T) properties.get(name); + } + + /** + * Sets a property's value. + * + * @param name property name + * @param value new property value + */ + public void setProperty(String name, Serializable value) { + logger.error("set property {}={} manager={}", name, value, this); + properties.put(name, value); + } + + /** + * Removes a property. + * + * @param name property name + */ + public void removeProperty(String name) { + properties.remove(name); + } + + /** + * Initializes various components, on demand. + */ + private static class LazyInitData { + private static final OperationHistoryDataManager DATA_MANAGER; + private static final ActorService ACTOR_SERVICE; + + static { + // TODO how to dynamically change data manager, depending whether or not + // guards are enabled? + EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG); + ACTOR_SERVICE = services.getActorService(); + DATA_MANAGER = services.getDataManager(); + } + } + + // the following methods may be overridden by junit tests + + public Executor getExecutor() { + return ForkJoinPool.commonPool(); + } + + protected ExecutorService getBlockingExecutor() { + return PolicyEngineConstants.getManager().getExecutorService(); + } + + protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) { + PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false); + } + + public ActorService getActorService() { + return LazyInitData.ACTOR_SERVICE; + } + + public OperationHistoryDataManager getDataManager() { + return LazyInitData.DATA_MANAGER; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java new file mode 100644 index 000000000..01c64e5bc --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java @@ -0,0 +1,253 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.Getter; +import lombok.NonNull; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A single step within a single policy. The rules typically break a policy operation down + * into separate steps. For instance, a policy for VF-Module-Create would be broken down + * into lock target, A&AI tenant query, A&AI custom query, guard, and finally + * VF-Module-Create. + */ +public class Step { + private static final Logger logger = LoggerFactory.getLogger(Step.class); + + @Getter + protected ControlLoopOperationParams params; + + /** + * Time when the first step started for the current policy. This is shared by all + * steps for the policy. When a step is started and finds this to be {@code null}, it + * sets the value. Subsequent steps leave it unchanged. + */ + private final AtomicReference<Instant> startTime; + + /** + * {@code True} if this step is for the policy's actual operation, {@code false} if it's a preprocessor step. + */ + @Getter + private final boolean policyStep; + + /** + * The operation for this step. + */ + @Getter + private Operation operation = null; + + /** + * Used to cancel the running operation. + */ + protected CompletableFuture<OperationOutcome> future; + + + /** + * Constructs the object. This is used when constructing the step for the policy's actual operation. + * + * @param params operation parameters + * @param startTime start time of the first step for the current policy, initially + * containing {@code null} + */ + public Step(ControlLoopOperationParams params, @NonNull AtomicReference<Instant> startTime) { + this.params = params; + this.startTime = startTime; + this.policyStep = true; + } + + /** + * Constructs the object using information from another step. This is used when constructing a preprocessing + * step. + * + * @param otherStep step whose information should be used + * @param actor actor name + * @param operation operation name + */ + public Step(Step otherStep, String actor, String operation) { + this.params = otherStep.params.toBuilder().actor(actor).operation(operation).retry(null).timeoutSec(null) + .payload(new LinkedHashMap<>()).build(); + this.startTime = otherStep.startTime; + this.policyStep = false; + } + + public String getActorName() { + return params.getActor(); + } + + public String getOperationName() { + return params.getOperation(); + } + + /** + * Determines if the operation has been initialized (i.e., created). + * + * @return {@code true} if the operation has been initialized, {@code false} otherwise + */ + public boolean isInitialized() { + return (operation != null); + } + + /** + * Initializes the step, creating the operation if it has not yet been created. + */ + public void init() { + if (operation == null) { + operation = buildOperation(); + } + } + + /** + * Starts the operation. + * + * @param remainingMs time remaining, in milliseconds, for the control loop + * @return {@code true} if started, {@code false} if the step is no longer necessary + * (i.e., because it was previously completed) + */ + public boolean start(long remainingMs) { + if (!isInitialized()) { + throw new IllegalStateException("step has not been initialized"); + } + + if (future != null) { + throw new IllegalStateException("step is already running"); + } + + try { + initStartTime(); + future = operation.start(); + + // handle any exceptions that may be thrown, set timeout, and handle timeout + + // @formatter:off + future.exceptionally(this::handleException) + .orTimeout(remainingMs, TimeUnit.MILLISECONDS) + .exceptionally(this::handleTimeout); + // @formatter:on + + } catch (RuntimeException e) { + handleException(e); + } + + return true; + } + + /** + * Handles exceptions that may be generated. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleException(Throwable thrown) { + if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { + return null; + } + + logger.warn("{}.{}: exception starting operation for {}", params.getActor(), params.getOperation(), + params.getRequestId(), thrown); + OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown); + outcome.setStart(startTime.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + params.getCompleteCallback().accept(outcome); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Handles control loop timeout exception. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleTimeout(Throwable thrown) { + logger.warn("{}.{}: control loop timeout for {}", params.getActor(), params.getOperation(), + params.getRequestId(), thrown); + + OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown); + outcome.setActor(ActorConstants.CL_TIMEOUT_ACTOR); + outcome.setOperation(null); + outcome.setStart(startTime.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + params.getCompleteCallback().accept(outcome); + + // cancel the operation, if it's still running + future.cancel(false); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Cancels the operation, if it's running. + */ + public void cancel() { + if (future != null) { + future.cancel(false); + } + } + + /** + * Initializes the start time, if it's still unset. + */ + private void initStartTime() { + if (startTime.get() == null) { + startTime.set(Instant.now()); + } + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if it hasn't been initialized yet + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Builds the operation. The default method simply invokes + * {@link ControlLoopOperationParams#build()}. + * + * @return a new operation + */ + protected Operation buildOperation() { + return params.build(); + } + + @Override + public String toString() { + return "Step(actor=" + getActorName() + ", operation=" + getOperationName() + ")"; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java new file mode 100644 index 000000000..5251b7acc --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java @@ -0,0 +1,72 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; + +/** + * Context used by steps to perform their work. + */ +public interface StepContext { + + /** + * Determines if the context contains a property. + * + * @param name name of the property of interest + * @return {@code true} if the context contains the property, {@code false} otherwise + */ + public boolean contains(String name); + + /** + * Gets a property, casting it to the desired type. + * + * @param <T> desired type + * @param name name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not yet have a value + */ + public <T> T getProperty(String name); + + /** + * Sets a property's value. + * + * @param name property name + * @param value new property value + */ + public void setProperty(String name, Serializable value); + + /** + * Removes a property. + * + * @param name property name + */ + public void removeProperty(String name); + + /** + * Requests a lock. This requests the lock for the time that remains before the + * timeout expires. This avoids having to extend the lock. + * + * @param targetEntity entity to be locked + * @return a future that can be used to await the lock + */ + public CompletableFuture<OperationOutcome> requestLock(String targetEntity); +} diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java new file mode 100644 index 000000000..91434d7ea --- /dev/null +++ b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManagerTest.java @@ -0,0 +1,295 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardYamlCoder; +import org.onap.policy.common.utils.io.Serializer; +import org.onap.policy.common.utils.resources.ResourceUtils; +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.onap.policy.controlloop.policy.Target; +import org.onap.policy.controlloop.policy.TargetType; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.LockImpl; +import org.onap.policy.drools.core.lock.LockState; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; + +public class ControlLoopEventManagerTest { + private static final UUID REQ_ID = UUID.randomUUID(); + private static final String CL_NAME = "my-closed-loop-name"; + private static final String POLICY_NAME = "my-policy-name"; + private static final String POLICY_SCOPE = "my-scope"; + private static final String POLICY_VERSION = "1.2.3"; + private static final String LOCK1 = "my-lock-A"; + private static final String LOCK2 = "my-lock-B"; + private static final Coder yamlCoder = new StandardYamlCoder(); + private static final String MY_KEY = "def"; + + @Mock + private ExecutorService executor; + + private long preCreateTimeMs; + private List<LockImpl> locks; + private Target target; + private ToscaPolicy tosca; + private ControlLoopParams params; + private ControlLoopEventManager mgr; + + /** + * Sets up. + */ + @Before + public void setUp() throws ControlLoopException, CoderException { + MockitoAnnotations.initMocks(this); + + target = new Target(); + target.setType(TargetType.VNF); + + params = new ControlLoopParams(); + params.setClosedLoopControlName(CL_NAME); + params.setPolicyName(POLICY_NAME); + params.setPolicyScope(POLICY_SCOPE); + params.setPolicyVersion(POLICY_VERSION); + + loadPolicy("eventManager/event-mgr-simple.yaml"); + + locks = new ArrayList<>(); + + preCreateTimeMs = System.currentTimeMillis(); + + MyManager.executor = executor; + MyManager.locks = locks; + + mgr = new MyManager(params, REQ_ID); + } + + @Test + public void testConstructor() { + assertEquals(POLICY_NAME, mgr.getPolicyName()); + + assertTrue(mgr.isActive()); + assertEquals(CL_NAME, mgr.getClosedLoopControlName()); + assertSame(REQ_ID, mgr.getRequestId()); + assertEquals(POLICY_NAME, mgr.getPolicyName()); + assertEquals(POLICY_VERSION, mgr.getPolicyVersion()); + assertNotNull(mgr.getProcessor()); + assertThat(mgr.getEndTimeMs()).isGreaterThanOrEqualTo(preCreateTimeMs); + } + + @Test + public void testGetCreateCount() throws ControlLoopException { + long original = ControlLoopEventManager.getCreateCount(); + + new MyManager(params, REQ_ID); + assertEquals(original + 1, ControlLoopEventManager.getCreateCount()); + + new MyManager(params, REQ_ID); + assertEquals(original + 2, ControlLoopEventManager.getCreateCount()); + } + + @Test + public void testIsActive() throws Exception { + mgr = new ControlLoopEventManager(params, REQ_ID); + assertTrue(mgr.isActive()); + + ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr); + assertFalse(mgr2.isActive()); + } + + @Test + public void testDestroy() throws IOException { + mgr.requestLock(LOCK1); + mgr.requestLock(LOCK2); + mgr.requestLock(LOCK1); + + // ensure destroy() doesn't throw an exception if the object is deserialized + ControlLoopEventManager mgr2 = Serializer.roundTrip(mgr); + assertThatCode(() -> mgr2.destroy()).doesNotThrowAnyException(); + + // locks should not have been freed + for (LockImpl lock : locks) { + assertFalse(lock.isUnavailable()); + } + + mgr.destroy(); + + runExecutor(); + + for (LockImpl lock : locks) { + assertTrue(lock.isUnavailable()); + } + } + + @Test + public void testDetmControlLoopTimeoutMs() throws Exception { + long timeMs = 1200 * 1000L; + long end = mgr.getEndTimeMs(); + assertThat(end).isGreaterThanOrEqualTo(preCreateTimeMs + timeMs); + assertThat(end).isLessThan(preCreateTimeMs + timeMs + 5000); + } + + @Test + public void testRequestLock() { + final CompletableFuture<OperationOutcome> future1 = mgr.requestLock(LOCK1); + assertTrue(mgr.getOutcomes().isEmpty()); + + final CompletableFuture<OperationOutcome> future2 = mgr.requestLock(LOCK2); + assertTrue(mgr.getOutcomes().isEmpty()); + + assertSame(future1, mgr.requestLock(LOCK1)); + assertTrue(mgr.getOutcomes().isEmpty()); + + assertEquals(2, locks.size()); + + assertTrue(future1.isDone()); + assertTrue(future2.isDone()); + + // indicate that the first lock failed + locks.get(0).notifyUnavailable(); + + verifyLock(PolicyResult.FAILURE); + assertTrue(mgr.getOutcomes().isEmpty()); + } + + private void verifyLock(PolicyResult result) { + OperationOutcome outcome = mgr.getOutcomes().poll(); + assertNotNull(outcome); + assertEquals(ActorConstants.LOCK_ACTOR, outcome.getActor()); + assertEquals(ActorConstants.LOCK_OPERATION, outcome.getOperation()); + assertNotNull(outcome.getEnd()); + assertTrue(outcome.isFinalOutcome()); + assertEquals(result, outcome.getResult()); + } + + @Test + public void testOnStart() { + OperationOutcome outcome1 = new OperationOutcome(); + OperationOutcome outcome2 = new OperationOutcome(); + + mgr.onStart(outcome1); + mgr.onStart(outcome2); + + assertSame(outcome1, mgr.getOutcomes().poll()); + assertSame(outcome2, mgr.getOutcomes().poll()); + assertTrue(mgr.getOutcomes().isEmpty()); + } + + @Test + public void testOnComplete() { + OperationOutcome outcome1 = new OperationOutcome(); + OperationOutcome outcome2 = new OperationOutcome(); + + mgr.onComplete(outcome1); + mgr.onComplete(outcome2); + + assertSame(outcome1, mgr.getOutcomes().poll()); + assertSame(outcome2, mgr.getOutcomes().poll()); + assertTrue(mgr.getOutcomes().isEmpty()); + } + + @Test + public void testContains_testGetProperty_testSetProperty_testRemoveProperty() { + mgr.setProperty("abc", "a string"); + mgr.setProperty(MY_KEY, 100); + + assertTrue(mgr.contains(MY_KEY)); + assertFalse(mgr.contains("ghi")); + + String strValue = mgr.getProperty("abc"); + assertEquals("a string", strValue); + + int intValue = mgr.getProperty(MY_KEY); + assertEquals(100, intValue); + + mgr.removeProperty(MY_KEY); + assertFalse(mgr.contains(MY_KEY)); + } + + @Test + public void testToString() { + assertNotNull(mgr.toString()); + } + + private void loadPolicy(String fileName) throws CoderException { + ToscaServiceTemplate template = + yamlCoder.decode(ResourceUtils.getResourceAsString(fileName), ToscaServiceTemplate.class); + tosca = template.getToscaTopologyTemplate().getPolicies().get(0).values().iterator().next(); + + params.setToscaPolicy(tosca); + } + + private void runExecutor() { + ArgumentCaptor<Runnable> runCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(executor).execute(runCaptor.capture()); + + runCaptor.getValue().run(); + } + + + private static class MyManager extends ControlLoopEventManager { + private static final long serialVersionUID = 1L; + + private static ExecutorService executor; + private static List<LockImpl> locks; + + public MyManager(ControlLoopParams params, UUID requestId) + throws ControlLoopException { + super(params, requestId); + } + + @Override + protected ExecutorService getBlockingExecutor() { + return executor; + } + + @Override + protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) { + LockImpl lock = new LockImpl(LockState.ACTIVE, targetEntity, requestId, holdSec, callback); + locks.add(lock); + callback.lockAvailable(lock); + } + } +} diff --git a/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java new file mode 100644 index 000000000..1472adc18 --- /dev/null +++ b/controlloop/common/eventmanager/src/test/java/org/onap/policy/controlloop/eventmanager/StepTest.java @@ -0,0 +1,380 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.Operator; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.spi.Actor; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.onap.policy.controlloop.policy.Target; +import org.onap.policy.controlloop.policy.TargetType; + +public class StepTest { + private static final UUID REQ_ID = UUID.randomUUID(); + private static final String POLICY_ID = "my-policy"; + private static final String POLICY_ACTOR = "my-actor"; + private static final String POLICY_OPERATION = "my-operation"; + private static final String MY_TARGET = "my-target"; + private static final String PAYLOAD_KEY = "payload-key"; + private static final String PAYLOAD_VALUE = "payload-value"; + private static final long REMAINING_MS = 5000; + private static final Integer POLICY_RETRY = 3; + private static final Integer POLICY_TIMEOUT = 20; + private static final String EXPECTED_EXCEPTION = "expected exception"; + + @Mock + private Operator policyOperator; + @Mock + private Operation policyOperation; + @Mock + private Actor policyActor; + @Mock + private ActorService actors; + + private CompletableFuture<OperationOutcome> future; + private Target target; + private Map<String, String> payload; + private Policy policy; + private VirtualControlLoopEvent event; + private ControlLoopEventContext context; + private BlockingQueue<OperationOutcome> starts; + private BlockingQueue<OperationOutcome> completions; + private ControlLoopOperationParams params; + private AtomicReference<Instant> startTime; + private Step step; + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + future = new CompletableFuture<>(); + + // configure policy operation + when(actors.getActor(POLICY_ACTOR)).thenReturn(policyActor); + when(policyActor.getOperator(POLICY_OPERATION)).thenReturn(policyOperator); + when(policyOperator.buildOperation(any())).thenReturn(policyOperation); + when(policyOperation.start()).thenReturn(future); + + target = new Target(); + target.setType(TargetType.VM); + + payload = Map.of(PAYLOAD_KEY, PAYLOAD_VALUE); + + policy = new Policy(); + policy.setId(POLICY_ID); + policy.setActor(POLICY_ACTOR); + policy.setRecipe(POLICY_OPERATION); + policy.setTarget(target); + policy.setPayload(payload); + policy.setRetry(POLICY_RETRY); + policy.setTimeout(POLICY_TIMEOUT); + + event = new VirtualControlLoopEvent(); + event.setRequestId(REQ_ID); + event.setTarget(ControlLoopOperationManager2.VSERVER_VSERVER_NAME); + event.setAai(new TreeMap<>(Map.of(ControlLoopOperationManager2.VSERVER_VSERVER_NAME, MY_TARGET))); + + context = new ControlLoopEventContext(event); + + starts = new LinkedBlockingQueue<>(); + completions = new LinkedBlockingQueue<>(); + + params = ControlLoopOperationParams.builder().actor(POLICY_ACTOR).actorService(actors) + .completeCallback(completions::add).context(context).executor(ForkJoinPool.commonPool()) + .operation(POLICY_OPERATION).payload(new TreeMap<>(payload)).startCallback(starts::add) + .target(target).targetEntity(MY_TARGET).build(); + + startTime = new AtomicReference<>(); + + step = new Step(params, startTime); + } + + @Test + public void testConstructor() { + assertTrue(step.isPolicyStep()); + assertSame(params, step.getParams()); + + // check that it recorded the startTime by starting and checking it + step.init(); + step.start(REMAINING_MS); + + assertNotNull(startTime.get()); + + // try with null start time + assertThatThrownBy(() -> new Step(params, null)).isInstanceOf(NullPointerException.class) + .hasMessageContaining("startTime"); + } + + @Test + public void testConstructorWithOtherStep_testInitStartTime_testGetStartTimeRef() { + Step step2 = new Step(step, "actorB", "operB"); + assertFalse(step2.isPolicyStep()); + + ControlLoopOperationParams params2 = step2.getParams(); + assertEquals("actorB", params2.getActor()); + assertEquals("operB", params2.getOperation()); + assertNull(params2.getRetry()); + assertNull(params2.getTimeoutSec()); + assertSame(target, params2.getTarget()); + assertEquals(MY_TARGET, params2.getTargetEntity()); + assertTrue(params2.getPayload().isEmpty()); + + when(actors.getActor(params2.getActor())).thenReturn(policyActor); + when(policyActor.getOperator(params2.getOperation())).thenReturn(policyOperator); + + assertNull(step2.getStartTime()); + + // check that it recorded the startTime by starting and checking it + step2.init(); + step2.start(REMAINING_MS); + + Instant instant = startTime.get(); + assertNotNull(instant); + assertSame(instant, step2.getStartTime()); + + // launch the original step, too, so we can test the other branch of + // initStartTime() + step.init(); + step.start(REMAINING_MS); + + assertSame(instant, startTime.get()); + assertSame(instant, step.getStartTime()); + } + + @Test + public void testGetActorName_testGetOperationName() { + assertEquals(POLICY_ACTOR, step.getActorName()); + assertEquals(POLICY_OPERATION, step.getOperationName()); + } + + @Test + public void testIsInitialized_testInit_testGetOperation() { + assertFalse(step.isInitialized()); + + // verify it's unchanged + assertFalse(step.isInitialized()); + + assertNull(step.getOperation()); + + step.init(); + + assertSame(policyOperation, step.getOperation()); + assertTrue(step.isInitialized()); + + // repeat - should be unchanged + step.init(); + assertSame(policyOperation, step.getOperation()); + assertTrue(step.isInitialized()); + + // repeat without init - should be unchanged + assertSame(policyOperation, step.getOperation()); + assertTrue(step.isInitialized()); + } + + @Test + public void testStart() { + assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS)) + .withMessage("step has not been initialized"); + + // initialize it, by calling getOperation(), and then try again + step.init(); + assertTrue(step.start(REMAINING_MS)); + + assertNotNull(startTime.get()); + + // should fail if we try again + assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS)) + .withMessage("step is already running"); + } + + /** + * Tests start() when the operation.start() throws an exception. + */ + @Test + public void testStartException() { + when(policyOperation.start()).thenThrow(new RuntimeException()); + step.init(); + + assertTrue(step.start(REMAINING_MS)); + + // exception should be immediate + OperationOutcome outcome = completions.poll(); + assertNotNull(outcome); + + assertNotEquals(PolicyResult.SUCCESS, outcome.getResult()); + assertEquals(POLICY_ACTOR, outcome.getActor()); + assertTrue(outcome.isFinalOutcome()); + } + + /** + * Tests start() when the operation throws an asynchronous exception. + */ + @Test + public void testStartAsyncException() { + step.init(); + step.start(REMAINING_MS); + + future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION)); + + // exception should be immediate + OperationOutcome outcome = completions.poll(); + assertNotNull(outcome); + + assertNotEquals(PolicyResult.SUCCESS, outcome.getResult()); + assertEquals(POLICY_ACTOR, outcome.getActor()); + assertTrue(outcome.isFinalOutcome()); + } + + /** + * Tests handleException() when the exception is a CancellationException. + */ + @Test + public void testHandleExceptionCancellationException() { + step.init(); + step.start(REMAINING_MS); + + future.completeExceptionally(new CancellationException(EXPECTED_EXCEPTION)); + + // should not have generated an outcome + assertNull(completions.peek()); + } + + @Test + public void testHandleExceptionCauseCancellationException() { + step.init(); + step.start(REMAINING_MS); + + future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION, new CancellationException())); + + // should not have generated an outcome + assertNull(completions.peek()); + } + + @Test + public void testHandleException() { + when(policyOperation.start()).thenThrow(new RuntimeException()); + + step.init(); + + assertTrue(step.start(REMAINING_MS)); + + // exception should be immediate + OperationOutcome outcome = completions.poll(); + assertNotNull(outcome); + + assertNotEquals(PolicyResult.SUCCESS, outcome.getResult()); + assertEquals(POLICY_ACTOR, outcome.getActor()); + assertTrue(outcome.isFinalOutcome()); + assertEquals(POLICY_OPERATION, outcome.getOperation()); + assertSame(startTime.get(), outcome.getStart()); + assertNotNull(outcome.getEnd()); + assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond()); + } + + @Test + public void testHandleTimeout() throws InterruptedException { + step.init(); + + long tstart = System.currentTimeMillis(); + + // give it a short timeout + step.start(100); + + OperationOutcome outcome = completions.poll(5, TimeUnit.SECONDS); + assertNotNull(outcome); + + // should not have timed out before 100ms + assertTrue(tstart + 100 <= System.currentTimeMillis()); + + // must wait for the future to complete before checking that it was cancelled + assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)).isInstanceOf(Exception.class); + + // verify that the future was cancelled + assertTrue(future.isCancelled()); + + assertNotEquals(PolicyResult.SUCCESS, outcome.getResult()); + assertEquals(ActorConstants.CL_TIMEOUT_ACTOR, outcome.getActor()); + assertTrue(outcome.isFinalOutcome()); + assertNull(outcome.getOperation()); + assertSame(startTime.get(), outcome.getStart()); + assertNotNull(outcome.getEnd()); + assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond()); + } + + @Test + public void testCancel() { + // should have no effect + step.cancel(); + + step.init(); + + step.start(REMAINING_MS); + step.cancel(); + + assertTrue(future.isCancelled()); + } + + @Test + public void testBuildOperation() { + assertSame(policyOperation, step.buildOperation()); + } + + @Test + public void testToString() { + assertNotNull(step.toString()); + } +} |