diff options
author | Jim Hahn <jrh3@att.com> | 2020-08-20 10:11:54 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-08-20 11:00:06 -0400 |
commit | 56efff004af2d1be64c67f7c8091cb4553a0e86b (patch) | |
tree | e75f3248c9a25f62a5d2a9483616cfafad4927ad /controlloop/common/eventmanager/src/main | |
parent | 6656a9fc98c178351122fd326940fbb6923d292f (diff) |
Add generic eventmanager classes
Added classes that are event-agnostic and support moving control from
java into rules.
Updates per review comments:
- removed policy scope
Issue-ID: POLICY-2748-event-mgr
Change-Id: Icf811cc25a3975543fc5c725766b7b9df2bb87b0
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'controlloop/common/eventmanager/src/main')
4 files changed, 656 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java new file mode 100644 index 000000000..2591a3fa0 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ActorConstants.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +public class ActorConstants { + public static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-"; + public static final String LOCK_ACTOR = "LOCK"; + public static final String LOCK_OPERATION = "Lock"; + + public static final String PAYLOAD_KEY_VF_COUNT = "vfCount"; + + + private ActorConstants() { + // do nothing + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java new file mode 100644 index 000000000..5cf087abd --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java @@ -0,0 +1,297 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.ToString; +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager; +import org.onap.policy.controlloop.processor.ControlLoopProcessor; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for a single event. Once this has been created, the event can be retracted from + * working memory. Invoke {@link #isActive()} to determine if the manager is active (i.e., + * hasn't been replicated from another server). When the manager is no longer needed, + * {@link #destroy()} should be invoked. + */ +@ToString(onlyExplicitlyIncluded = true) +public class ControlLoopEventManager implements StepContext, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class); + private static final long serialVersionUID = -1216568161322872641L; + + private static final String EVENT_MANAGER_SERVICE_CONFIG = "event-manager"; + + /** + * Counts the number of these objects that have been created. This is used by junit + * tests. + */ + private static final AtomicLong createCount = new AtomicLong(0); + + /** + * {@code True} if this object was created by this JVM instance, {@code false} + * otherwise. This will be {@code false} if this object is reconstituted from a + * persistent store or by transfer from another server. + */ + private transient boolean createdByThisJvmInstance; + + @Getter + @ToString.Include + public final String closedLoopControlName; + @Getter + @ToString.Include + private final UUID requestId; + + /** + * Time, in milliseconds, when the control loop will time out. + */ + @Getter + private final long endTimeMs; + + // fields extracted from the ControlLoopParams + @Getter + private final String policyName; + @Getter + private final String policyVersion; + + /** + * Maps a target entity to its lock. + */ + private final transient Map<String, LockData> target2lock = new HashMap<>(); + + @Getter(AccessLevel.PROTECTED) + private final ControlLoopProcessor processor; + + /** + * Set of properties used while processing the event. + */ + private Map<String, Serializable> properties = new ConcurrentHashMap<>(); + + /** + * Unprocessed outcomes from the operations. Outcomes are added to this each time the + * "start" or "complete" callback is invoked, typically by an operation running in a + * background thread, thus it must be thread safe. + */ + @Getter + private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>(); + + + /** + * Constructs the object. + * + * @param params control loop parameters + * @param requestId event request ID + * @throws ControlLoopException if the event is invalid or if a YAML processor cannot + * be created + */ + public ControlLoopEventManager(ControlLoopParams params, UUID requestId) throws ControlLoopException { + + createCount.incrementAndGet(); + + this.createdByThisJvmInstance = true; + this.closedLoopControlName = params.getClosedLoopControlName(); + this.requestId = requestId; + this.policyName = params.getPolicyName(); + this.policyVersion = params.getPolicyVersion(); + this.processor = new ControlLoopProcessor(params.getToscaPolicy()); + this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs(); + } + + /** + * Gets the number of manager objects that have been created. + * + * @return the number of manager objects that have been created + */ + public static long getCreateCount() { + return createCount.get(); + } + + /** + * Determines if the manager is still active. + * + * @return {@code true} if the manager is still active, {@code false} otherwise + */ + public boolean isActive() { + return createdByThisJvmInstance; + } + + /** + * Cancels the current operation and frees all locks. + */ + public void destroy() { + if (isActive()) { + getBlockingExecutor().execute(this::freeAllLocks); + } + } + + /** + * Frees all locks. + */ + private void freeAllLocks() { + target2lock.values().forEach(LockData::free); + } + + /** + * Determines the overall control loop timeout. + * + * @return the policy timeout, in milliseconds, if specified, a default timeout + * otherwise + */ + private long detmControlLoopTimeoutMs() { + // validation checks preclude null or 0 timeout values in the policy + Integer timeout = processor.getControlLoop().getTimeout(); + return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS); + } + + /** + * Requests a lock. This requests the lock for the time that remains before the + * timeout expires. This avoids having to extend the lock. + * + * @param targetEntity entity to be locked + * @return a future that can be used to await the lock + */ + @Override + public synchronized CompletableFuture<OperationOutcome> requestLock(String targetEntity) { + + long remainingMs = endTimeMs - System.currentTimeMillis(); + int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS)); + + LockData data = target2lock.computeIfAbsent(targetEntity, key -> { + LockData data2 = new LockData(key, requestId); + makeLock(targetEntity, requestId.toString(), remainingSec, data2); + + data2.addUnavailableCallback(this::onComplete); + + return data2; + }); + + return data.getFuture(); + } + + public void onStart(OperationOutcome outcome) { + outcomes.add(outcome); + } + + public void onComplete(OperationOutcome outcome) { + outcomes.add(outcome); + } + + /** + * Determines if the context contains a property. + * + * @param name name of the property of interest + * @return {@code true} if the context contains the property, {@code false} otherwise + */ + public boolean contains(String name) { + return properties.containsKey(name); + } + + /** + * Gets a property, casting it to the desired type. + * + * @param <T> desired type + * @param name name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not yet have a value + */ + @SuppressWarnings("unchecked") + public <T> T getProperty(String name) { + return (T) properties.get(name); + } + + /** + * Sets a property's value. + * + * @param name property name + * @param value new property value + */ + public void setProperty(String name, Serializable value) { + logger.error("set property {}={} manager={}", name, value, this); + properties.put(name, value); + } + + /** + * Removes a property. + * + * @param name property name + */ + public void removeProperty(String name) { + properties.remove(name); + } + + /** + * Initializes various components, on demand. + */ + private static class LazyInitData { + private static final OperationHistoryDataManager DATA_MANAGER; + private static final ActorService ACTOR_SERVICE; + + static { + // TODO how to dynamically change data manager, depending whether or not + // guards are enabled? + EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG); + ACTOR_SERVICE = services.getActorService(); + DATA_MANAGER = services.getDataManager(); + } + } + + // the following methods may be overridden by junit tests + + public Executor getExecutor() { + return ForkJoinPool.commonPool(); + } + + protected ExecutorService getBlockingExecutor() { + return PolicyEngineConstants.getManager().getExecutorService(); + } + + protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) { + PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false); + } + + public ActorService getActorService() { + return LazyInitData.ACTOR_SERVICE; + } + + public OperationHistoryDataManager getDataManager() { + return LazyInitData.DATA_MANAGER; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java new file mode 100644 index 000000000..01c64e5bc --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/Step.java @@ -0,0 +1,253 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.Getter; +import lombok.NonNull; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A single step within a single policy. The rules typically break a policy operation down + * into separate steps. For instance, a policy for VF-Module-Create would be broken down + * into lock target, A&AI tenant query, A&AI custom query, guard, and finally + * VF-Module-Create. + */ +public class Step { + private static final Logger logger = LoggerFactory.getLogger(Step.class); + + @Getter + protected ControlLoopOperationParams params; + + /** + * Time when the first step started for the current policy. This is shared by all + * steps for the policy. When a step is started and finds this to be {@code null}, it + * sets the value. Subsequent steps leave it unchanged. + */ + private final AtomicReference<Instant> startTime; + + /** + * {@code True} if this step is for the policy's actual operation, {@code false} if it's a preprocessor step. + */ + @Getter + private final boolean policyStep; + + /** + * The operation for this step. + */ + @Getter + private Operation operation = null; + + /** + * Used to cancel the running operation. + */ + protected CompletableFuture<OperationOutcome> future; + + + /** + * Constructs the object. This is used when constructing the step for the policy's actual operation. + * + * @param params operation parameters + * @param startTime start time of the first step for the current policy, initially + * containing {@code null} + */ + public Step(ControlLoopOperationParams params, @NonNull AtomicReference<Instant> startTime) { + this.params = params; + this.startTime = startTime; + this.policyStep = true; + } + + /** + * Constructs the object using information from another step. This is used when constructing a preprocessing + * step. + * + * @param otherStep step whose information should be used + * @param actor actor name + * @param operation operation name + */ + public Step(Step otherStep, String actor, String operation) { + this.params = otherStep.params.toBuilder().actor(actor).operation(operation).retry(null).timeoutSec(null) + .payload(new LinkedHashMap<>()).build(); + this.startTime = otherStep.startTime; + this.policyStep = false; + } + + public String getActorName() { + return params.getActor(); + } + + public String getOperationName() { + return params.getOperation(); + } + + /** + * Determines if the operation has been initialized (i.e., created). + * + * @return {@code true} if the operation has been initialized, {@code false} otherwise + */ + public boolean isInitialized() { + return (operation != null); + } + + /** + * Initializes the step, creating the operation if it has not yet been created. + */ + public void init() { + if (operation == null) { + operation = buildOperation(); + } + } + + /** + * Starts the operation. + * + * @param remainingMs time remaining, in milliseconds, for the control loop + * @return {@code true} if started, {@code false} if the step is no longer necessary + * (i.e., because it was previously completed) + */ + public boolean start(long remainingMs) { + if (!isInitialized()) { + throw new IllegalStateException("step has not been initialized"); + } + + if (future != null) { + throw new IllegalStateException("step is already running"); + } + + try { + initStartTime(); + future = operation.start(); + + // handle any exceptions that may be thrown, set timeout, and handle timeout + + // @formatter:off + future.exceptionally(this::handleException) + .orTimeout(remainingMs, TimeUnit.MILLISECONDS) + .exceptionally(this::handleTimeout); + // @formatter:on + + } catch (RuntimeException e) { + handleException(e); + } + + return true; + } + + /** + * Handles exceptions that may be generated. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleException(Throwable thrown) { + if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { + return null; + } + + logger.warn("{}.{}: exception starting operation for {}", params.getActor(), params.getOperation(), + params.getRequestId(), thrown); + OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown); + outcome.setStart(startTime.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + params.getCompleteCallback().accept(outcome); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Handles control loop timeout exception. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleTimeout(Throwable thrown) { + logger.warn("{}.{}: control loop timeout for {}", params.getActor(), params.getOperation(), + params.getRequestId(), thrown); + + OperationOutcome outcome = new PipelineUtil(params).setOutcome(params.makeOutcome(), thrown); + outcome.setActor(ActorConstants.CL_TIMEOUT_ACTOR); + outcome.setOperation(null); + outcome.setStart(startTime.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + params.getCompleteCallback().accept(outcome); + + // cancel the operation, if it's still running + future.cancel(false); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Cancels the operation, if it's running. + */ + public void cancel() { + if (future != null) { + future.cancel(false); + } + } + + /** + * Initializes the start time, if it's still unset. + */ + private void initStartTime() { + if (startTime.get() == null) { + startTime.set(Instant.now()); + } + } + + /** + * Gets the start time. + * + * @return the start time, or {@code null} if it hasn't been initialized yet + */ + public Instant getStartTime() { + return startTime.get(); + } + + /** + * Builds the operation. The default method simply invokes + * {@link ControlLoopOperationParams#build()}. + * + * @return a new operation + */ + protected Operation buildOperation() { + return params.build(); + } + + @Override + public String toString() { + return "Step(actor=" + getActorName() + ", operation=" + getOperationName() + ")"; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java new file mode 100644 index 000000000..5251b7acc --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/StepContext.java @@ -0,0 +1,72 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; + +/** + * Context used by steps to perform their work. + */ +public interface StepContext { + + /** + * Determines if the context contains a property. + * + * @param name name of the property of interest + * @return {@code true} if the context contains the property, {@code false} otherwise + */ + public boolean contains(String name); + + /** + * Gets a property, casting it to the desired type. + * + * @param <T> desired type + * @param name name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not yet have a value + */ + public <T> T getProperty(String name); + + /** + * Sets a property's value. + * + * @param name property name + * @param value new property value + */ + public void setProperty(String name, Serializable value); + + /** + * Removes a property. + * + * @param name property name + */ + public void removeProperty(String name); + + /** + * Requests a lock. This requests the lock for the time that remains before the + * timeout expires. This avoids having to extend the lock. + * + * @param targetEntity entity to be locked + * @return a future that can be used to await the lock + */ + public CompletableFuture<OperationOutcome> requestLock(String targetEntity); +} |