From c1f79cd311ad62d3adb374921b8c3d303db5add6 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Sat, 22 Feb 2020 17:11:20 -0500 Subject: Add frankfurt rules for Actor redesign Note: VcpeTest and VfwTest are not included, as they depend on updates to the APPC and APPC-LCM Actors. Added feature-controlloop-frankfurt. Added HTTP client property files to feature-controlloop-management. Updates per review comments: - pom changes - simplify FrankfurtBase - rename event-svc-http.properties - change "usescases" to "frankfurt" - use blanks for CDS property defaults - trailing spaces in http-client files - add https property to http-client files Added newlines to config files that appear to be missing them (based on feedback from gerrit). Issue-ID: POLICY-2385 Signed-off-by: Jim Hahn Change-Id: Ib4a4d75461c734ae47309e41dc9d099e8815d55d --- .../eventmanager/ControlLoopEventManager2.java | 614 ++++++++++++++++++ .../eventmanager/ControlLoopOperationManager2.java | 684 +++++++++++++++++++++ .../eventmanager/EventManagerServices.java | 179 ++++++ .../policy/controlloop/eventmanager/LockData.java | 181 ++++++ .../controlloop/eventmanager/ManagerContext.java | 64 ++ .../ophistory/OperationHistoryDataManager.java | 50 ++ .../ophistory/OperationHistoryDataManagerImpl.java | 295 +++++++++ .../OperationHistoryDataManagerParams.java | 80 +++ .../ophistory/OperationHistoryDataManagerStub.java | 45 ++ .../processor/ControlLoopProcessor.java | 9 +- .../policy/controlloop/utils/ControlLoopUtils.java | 210 +++++++ 11 files changed, 2407 insertions(+), 4 deletions(-) create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager2.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager2.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/EventManagerServices.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockData.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ManagerContext.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManager.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerParams.java create mode 100644 controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerStub.java (limited to 'controlloop/common/eventmanager/src/main') diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager2.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager2.java new file mode 100644 index 000000000..dc2b513a6 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager2.java @@ -0,0 +1,614 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import static org.onap.policy.controlloop.ControlLoopTargetType.PNF; +import static org.onap.policy.controlloop.ControlLoopTargetType.VM; +import static org.onap.policy.controlloop.ControlLoopTargetType.VNF; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.Getter; +import lombok.ToString; +import org.apache.commons.lang3.StringUtils; +import org.drools.core.WorkingMemory; +import org.kie.api.runtime.rule.FactHandle; +import org.onap.policy.controlloop.ControlLoopEventStatus; +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.ControlLoopNotificationType; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.VirtualControlLoopNotification; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager; +import org.onap.policy.controlloop.policy.FinalResult; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.processor.ControlLoopProcessor; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for a single control loop event. Once this has been created, the event can be + * retracted from working memory. Once this has been created, {@link #start()} should be + * invoked, and then {@link #nextStep()} should be invoked continually until + * {@link #isActive()} returns {@code false}, indicating that all steps have completed. + */ +@ToString(onlyExplicitlyIncluded = true) +public class ControlLoopEventManager2 implements ManagerContext, Serializable { + private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager2.class); + private static final long serialVersionUID = -1216568161322872641L; + + private static final String EVENT_MANAGER_SERVICE_CONFIG = "config/event-manager.properties"; + public static final String PROV_STATUS_ACTIVE = "ACTIVE"; + private static final String VM_NAME = "VM_NAME"; + private static final String VNF_NAME = "VNF_NAME"; + public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id"; + public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name"; + public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name"; + public static final String GENERIC_VNF_IS_CLOSED_LOOP_DISABLED = "generic-vnf.is-closed-loop-disabled"; + public static final String VSERVER_IS_CLOSED_LOOP_DISABLED = "vserver.is-closed-loop-disabled"; + public static final String PNF_IS_IN_MAINT = "pnf.in-maint"; + public static final String GENERIC_VNF_PROV_STATUS = "generic-vnf.prov-status"; + public static final String VSERVER_PROV_STATUS = "vserver.prov-status"; + public static final String PNF_ID = "pnf.pnf-id"; + public static final String PNF_NAME = "pnf.pnf-name"; + + private static final Set VALID_TARGETS = Stream + .of(VM_NAME, VNF_NAME, VSERVER_VSERVER_NAME, GENERIC_VNF_VNF_ID, GENERIC_VNF_VNF_NAME, PNF_NAME) + .map(String::toLowerCase).collect(Collectors.toSet()); + + private static final Set TRUE_VALUES = Set.of("true", "t", "yes", "y"); + + public enum NewEventStatus { + FIRST_ONSET, SUBSEQUENT_ONSET, FIRST_ABATEMENT, SUBSEQUENT_ABATEMENT, SYNTAX_ERROR + } + + // TODO limit the number of policies that may be executed for a single event? + + /** + * {@code True} if this object was created by this JVM instance, {@code false} + * otherwise. This will be {@code false} if this object is reconstituted from a + * persistent store or by transfer from another server. + */ + private transient boolean createdByThisJvmInstance; + + @Getter + @ToString.Include + public final String closedLoopControlName; + @Getter + @ToString.Include + private final UUID requestId; + private final ControlLoopEventContext context; + @ToString.Include + private int numOnsets = 1; + @ToString.Include + private int numAbatements = 0; + private VirtualControlLoopEvent abatement = null; + + /** + * Time, in milliseconds, when the control loop will time out. + */ + @Getter + private final long endTimeMs; + + // fields extracted from the ControlLoopParams + @Getter + private final String policyName; + private final String policyScope; + private final String policyVersion; + + private final LinkedList controlLoopHistory = new LinkedList<>(); + + /** + * Maps a target entity to its lock. + */ + private final transient Map target2lock = new HashMap<>(); + + private final ControlLoopProcessor processor; + private final AtomicReference currentOperation = new AtomicReference<>(); + + private FinalResult finalResult = null; + + @Getter + private VirtualControlLoopNotification notification; + + @Getter + private boolean updated = false; + + private final transient WorkingMemory workMem; + private transient FactHandle factHandle; + + + /** + * Constructs the object. + * + * @param params control loop parameters + * @param event event to be managed by this object + * @param workMem working memory to update if this changes + * @throws ControlLoopException if the event is invalid or if a YAML processor cannot + * be created + */ + public ControlLoopEventManager2(ControlLoopParams params, VirtualControlLoopEvent event, WorkingMemory workMem) + throws ControlLoopException { + + checkEventSyntax(event); + + if (isClosedLoopDisabled(event)) { + throw new IllegalStateException("is-closed-loop-disabled is set to true on VServer or VNF"); + } + + if (isProvStatusInactive(event)) { + throw new IllegalStateException("prov-status is not ACTIVE on VServer or VNF"); + } + + this.createdByThisJvmInstance = true; + this.closedLoopControlName = params.getClosedLoopControlName(); + this.requestId = event.getRequestId(); + this.context = new ControlLoopEventContext(event); + this.policyName = params.getPolicyName(); + this.policyScope = params.getPolicyScope(); + this.policyVersion = params.getPolicyVersion(); + this.processor = new ControlLoopProcessor(params.getToscaPolicy()); + this.workMem = workMem; + this.endTimeMs = System.currentTimeMillis() + detmControlLoopTimeoutMs(); + } + + /** + * Starts the manager. + * + * @throws ControlLoopException if the processor cannot get a policy + */ + public void start() throws ControlLoopException { + if (!isActive()) { + throw new IllegalStateException("manager is no longer active"); + } + + if ((factHandle = workMem.getFactHandle(this)) == null) { + throw new IllegalStateException("manager is not in working memory"); + } + + if (currentOperation.get() != null) { + throw new IllegalStateException("manager already started"); + } + + startOperation(); + } + + /** + * Starts an operation for the current processor policy. + * + * @throws ControlLoopException if the processor cannot get a policy + */ + private synchronized void startOperation() throws ControlLoopException { + + if ((finalResult = processor.checkIsCurrentPolicyFinal()) == null) { + // not final - start the next operation + currentOperation.set(makeOperationManager(context, processor.getCurrentPolicy())); + currentOperation.get().start(endTimeMs - System.currentTimeMillis()); + return; + } + + logger.info("final={} oper state={} for {}", finalResult, currentOperation.get().getState(), requestId); + + notification = makeNotification(); + notification.setHistory(controlLoopHistory); + + switch (finalResult) { + case FINAL_FAILURE_EXCEPTION: + notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); + notification.setMessage("Exception in processing closed loop"); + break; + case FINAL_SUCCESS: + notification.setNotification(ControlLoopNotificationType.FINAL_SUCCESS); + break; + case FINAL_OPENLOOP: + notification.setNotification(ControlLoopNotificationType.FINAL_OPENLOOP); + break; + case FINAL_FAILURE: + default: + notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); + break; + } + } + + /** + * Starts the next step, whatever that may be. + */ + public void nextStep() { + if (!isActive()) { + return; + } + + updated = false; + + try { + if (!currentOperation.get().nextStep()) { + // current operation is done - try the next + controlLoopHistory.addAll(currentOperation.get().getHistory()); + processor.nextPolicyForResult(currentOperation.get().getOperationResult()); + startOperation(); + } + + } catch (ControlLoopException | RuntimeException e) { + // processor problem - this is fatal + logger.warn("{}: cannot start next step for {}", closedLoopControlName, requestId, e); + notification = makeNotification(); + notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); + notification.setMessage("Policy processing aborted due to policy error"); + notification.setHistory(controlLoopHistory); + finalResult = FinalResult.FINAL_FAILURE_EXCEPTION; + } + } + + /** + * Determines if the manager is still active. + * + * @return {@code true} if the manager is still active, {@code false} otherwise + */ + public boolean isActive() { + return (createdByThisJvmInstance && finalResult == null); + } + + /** + * Updates working memory if this changes. + * + * @param operation operation manager that was updated + */ + @Override + public synchronized void updated(ControlLoopOperationManager2 operation) { + if (!isActive() || operation != currentOperation.get()) { + // no longer working on the given operation + return; + } + + notification = makeNotification(); + + VirtualControlLoopEvent event = context.getEvent(); + + notification.setHistory(operation.getHistory()); + + switch (operation.getState()) { + case LOCK_DENIED: + notification.setNotification(ControlLoopNotificationType.REJECTED); + notification.setMessage("The target " + event.getAai().get(event.getTarget()) + " is already locked"); + break; + case LOCK_LOST: + notification.setNotification(ControlLoopNotificationType.OPERATION_FAILURE); + notification.setMessage("The target " + event.getAai().get(event.getTarget()) + " is no longer locked"); + break; + case GUARD_STARTED: + notification.setNotification(ControlLoopNotificationType.OPERATION); + notification.setMessage( + "Sending guard query for " + operation.getActor() + " " + operation.getOperation()); + break; + case GUARD_PERMITTED: + notification.setNotification(ControlLoopNotificationType.OPERATION); + notification.setMessage("Guard result for " + operation.getActor() + " " + operation.getOperation() + + " is Permit"); + break; + case GUARD_DENIED: + notification.setNotification(ControlLoopNotificationType.OPERATION); + notification.setMessage("Guard result for " + operation.getActor() + " " + operation.getOperation() + + " is Deny"); + break; + case OPERATION_SUCCESS: + notification.setNotification(ControlLoopNotificationType.OPERATION_SUCCESS); + break; + + case CONTROL_LOOP_TIMEOUT: + logger.warn("{}: control loop timed out for {}", closedLoopControlName, requestId); + controlLoopHistory.addAll(currentOperation.get().getHistory()); + notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); + notification.setMessage("Control Loop timed out"); + notification.setHistory(controlLoopHistory); + finalResult = FinalResult.FINAL_FAILURE; + break; + + case OPERATION_FAILURE: + default: + notification.setNotification(ControlLoopNotificationType.OPERATION_FAILURE); + break; + } + + updated = true; + workMem.update(factHandle, this); + } + + /** + * Cancels the current operation and frees all locks. + */ + public void destroy() { + ControlLoopOperationManager2 oper = currentOperation.get(); + if (oper != null) { + oper.cancel(); + } + + getBlockingExecutor().execute(this::freeAllLocks); + } + + /** + * Frees all locks. + */ + private void freeAllLocks() { + target2lock.values().forEach(LockData::free); + } + + /** + * Makes a notification message for the current operation. + * + * @return a new notification + */ + public VirtualControlLoopNotification makeNotification() { + VirtualControlLoopNotification notif = new VirtualControlLoopNotification(); + notif.setNotification(ControlLoopNotificationType.OPERATION); + notif.setFrom("policy"); + notif.setPolicyScope(policyScope); + notif.setPolicyVersion(policyVersion); + + if (finalResult == null) { + ControlLoopOperationManager2 oper = currentOperation.get(); + if (oper != null) { + notif.setMessage(oper.getOperationMessage()); + notif.setHistory(oper.getHistory()); + } + } + + return notif; + } + + /** + * An event onset/abatement. + * + * @param event the event + * @return the status + */ + public NewEventStatus onNewEvent(VirtualControlLoopEvent event) { + try { + checkEventSyntax(event); + + if (event.getClosedLoopEventStatus() == ControlLoopEventStatus.ONSET) { + if (event.equals(context.getEvent())) { + return NewEventStatus.FIRST_ONSET; + } + + numOnsets++; + return NewEventStatus.SUBSEQUENT_ONSET; + + } else { + if (abatement == null) { + abatement = event; + numAbatements++; + return NewEventStatus.FIRST_ABATEMENT; + } else { + numAbatements++; + return NewEventStatus.SUBSEQUENT_ABATEMENT; + } + } + } catch (ControlLoopException e) { + logger.error("{}: onNewEvent threw an exception", this, e); + return NewEventStatus.SYNTAX_ERROR; + } + } + + /** + * Determines the overall control loop timeout. + * + * @return the policy timeout, in milliseconds, if specified, a default timeout + * otherwise + */ + private long detmControlLoopTimeoutMs() { + // validation checks preclude null or 0 timeout values in the policy + Integer timeout = processor.getControlLoop().getTimeout(); + return TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS); + } + + /** + * Check an event syntax. + * + * @param event the event syntax + * @throws ControlLoopException if an error occurs + */ + public void checkEventSyntax(VirtualControlLoopEvent event) throws ControlLoopException { + validateStatus(event); + if (StringUtils.isBlank(event.getClosedLoopControlName())) { + throw new ControlLoopException("No control loop name"); + } + if (event.getRequestId() == null) { + throw new ControlLoopException("No request ID"); + } + if (event.getClosedLoopEventStatus() == ControlLoopEventStatus.ABATED) { + return; + } + if (StringUtils.isBlank(event.getTarget())) { + throw new ControlLoopException("No target field"); + } else if (!VALID_TARGETS.contains(event.getTarget().toLowerCase())) { + throw new ControlLoopException("target field invalid"); + } + validateAaiData(event); + } + + private void validateStatus(VirtualControlLoopEvent event) throws ControlLoopException { + if (event.getClosedLoopEventStatus() != ControlLoopEventStatus.ONSET + && event.getClosedLoopEventStatus() != ControlLoopEventStatus.ABATED) { + throw new ControlLoopException("Invalid value in closedLoopEventStatus"); + } + } + + private void validateAaiData(VirtualControlLoopEvent event) throws ControlLoopException { + Map eventAai = event.getAai(); + if (eventAai == null) { + throw new ControlLoopException("AAI is null"); + } + if (event.getTargetType() == null) { + throw new ControlLoopException("The Target type is null"); + } + switch (event.getTargetType()) { + case VM: + case VNF: + validateAaiVmVnfData(eventAai); + return; + case PNF: + validateAaiPnfData(eventAai); + return; + default: + throw new ControlLoopException("The target type is not supported"); + } + } + + private void validateAaiVmVnfData(Map eventAai) throws ControlLoopException { + if (eventAai.get(GENERIC_VNF_VNF_ID) == null && eventAai.get(VSERVER_VSERVER_NAME) == null + && eventAai.get(GENERIC_VNF_VNF_NAME) == null) { + throw new ControlLoopException( + "generic-vnf.vnf-id or generic-vnf.vnf-name or vserver.vserver-name information missing"); + } + } + + private void validateAaiPnfData(Map eventAai) throws ControlLoopException { + if (eventAai.get(PNF_NAME) == null) { + throw new ControlLoopException("AAI PNF object key pnf-name is missing"); + } + } + + /** + * Is closed loop disabled for an event. + * + * @param event the event + * @return true if the control loop is disabled, false + * otherwise + */ + public static boolean isClosedLoopDisabled(VirtualControlLoopEvent event) { + Map aai = event.getAai(); + return (isAaiTrue(aai.get(VSERVER_IS_CLOSED_LOOP_DISABLED)) + || isAaiTrue(aai.get(GENERIC_VNF_IS_CLOSED_LOOP_DISABLED)) + || isAaiTrue(aai.get(PNF_IS_IN_MAINT))); + } + + /** + * Does provisioning status, for an event, have a value other than ACTIVE. + * + * @param event the event + * @return {@code true} if the provisioning status is neither ACTIVE nor {@code null}, + * {@code false} otherwise + */ + protected static boolean isProvStatusInactive(VirtualControlLoopEvent event) { + Map aai = event.getAai(); + return !(PROV_STATUS_ACTIVE.equals(aai.getOrDefault(VSERVER_PROV_STATUS, PROV_STATUS_ACTIVE)) + && PROV_STATUS_ACTIVE.equals(aai.getOrDefault(GENERIC_VNF_PROV_STATUS, PROV_STATUS_ACTIVE))); + } + + /** + * Determines the boolean value represented by the given AAI field value. + * + * @param aaiValue value to be examined + * @return the boolean value represented by the field value, or {@code false} if the + * value is {@code null} + */ + protected static boolean isAaiTrue(String aaiValue) { + return (aaiValue != null && TRUE_VALUES.contains(aaiValue.toLowerCase())); + } + + /** + * Requests a lock. This requests the lock for the time that remains before the + * timeout expires. This avoids having to extend the lock. + * + * @param targetEntity entity to be locked + * @param lockUnavailableCallback function to be invoked if the lock is + * unavailable/lost + * @return a future that can be used to await the lock + */ + @Override + public synchronized CompletableFuture requestLock(String targetEntity, + Consumer lockUnavailableCallback) { + + long remainingMs = endTimeMs - System.currentTimeMillis(); + int remainingSec = 15 + Math.max(0, (int) TimeUnit.SECONDS.convert(remainingMs, TimeUnit.MILLISECONDS)); + + LockData data = target2lock.computeIfAbsent(targetEntity, key -> { + LockData data2 = new LockData(key, requestId); + makeLock(targetEntity, requestId.toString(), remainingSec, data2); + return data2; + }); + + data.addUnavailableCallback(lockUnavailableCallback); + + return data.getFuture(); + } + + /** + * Initializes various components, on demand. + */ + private static class LazyInitData { + private static final OperationHistoryDataManager DATA_MANAGER; + private static final ActorService ACTOR_SERVICE; + + static { + EventManagerServices services = new EventManagerServices(EVENT_MANAGER_SERVICE_CONFIG); + ACTOR_SERVICE = services.getActorService(); + DATA_MANAGER = services.getDataManager(); + } + } + + // the following methods may be overridden by junit tests + + protected ControlLoopOperationManager2 makeOperationManager(ControlLoopEventContext ctx, Policy policy) { + return new ControlLoopOperationManager2(this, ctx, policy, getExecutor()); + } + + protected Executor getExecutor() { + return ForkJoinPool.commonPool(); + } + + protected ExecutorService getBlockingExecutor() { + return PolicyEngineConstants.getManager().getExecutorService(); + } + + protected void makeLock(String targetEntity, String requestId, int holdSec, LockCallback callback) { + PolicyEngineConstants.getManager().createLock(targetEntity, requestId, holdSec, callback, false); + } + + @Override + public ActorService getActorService() { + return LazyInitData.ACTOR_SERVICE; + } + + @Override + public OperationHistoryDataManager getDataManager() { + return LazyInitData.DATA_MANAGER; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager2.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager2.java new file mode 100644 index 000000000..6bdaa1575 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager2.java @@ -0,0 +1,684 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved. + * Modifications Copyright (C) 2019 Tech Mahindra + * Modifications Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.time.Instant; +import java.util.Deque; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.ToString; +import org.onap.policy.aai.AaiConstants; +import org.onap.policy.aai.AaiCqResponse; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages a single Operation for a single event. Once this has been created, + * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked + * continually until it returns {@code false}, indicating that all steps have completed. + */ +@ToString(onlyExplicitlyIncluded = true) +public class ControlLoopOperationManager2 implements Serializable { + private static final long serialVersionUID = -3773199283624595410L; + private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class); + private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-"; + public static final String LOCK_ACTOR = "LOCK"; + public static final String LOCK_OPERATION = "Lock"; + private static final String GUARD_ACTOR = "GUARD"; + public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name"; + public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name"; + public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id"; + public static final String PNF_NAME = "pnf.pnf-name"; + + // @formatter:off + public enum State { + ACTIVE, + LOCK_DENIED, + LOCK_LOST, + GUARD_STARTED, + GUARD_PERMITTED, + GUARD_DENIED, + OPERATION_SUCCESS, + OPERATION_FAILURE, + CONTROL_LOOP_TIMEOUT + } + // @formatter:on + + private final transient ManagerContext operContext; + private final transient ControlLoopEventContext eventContext; + private final Policy policy; + + @Getter + @ToString.Include + private State state = State.ACTIVE; + + @ToString.Include + private final String requestId; + + @ToString.Include + private final String policyId; + + /** + * Bumped each time the "complete" callback is invoked by the Actor, provided it's for + * this operation. + */ + @ToString.Include + private int attempts = 0; + + private final Deque operationHistory = new ConcurrentLinkedDeque<>(); + + /** + * Queue of outcomes yet to be processed. Outcomes are added to this each time the + * "start" or "complete" callback is invoked. + */ + @Getter(AccessLevel.PROTECTED) + private final transient Deque outcomes = new ConcurrentLinkedDeque<>(); + + /** + * Used to cancel the running operation. + */ + @Getter(AccessLevel.PROTECTED) + private transient CompletableFuture future = null; + + /** + * Target entity. Determined after the lock is granted, though it may require the + * custom query to be performed first. + */ + @Getter + private String targetEntity; + + @Getter(AccessLevel.PROTECTED) + private final transient ControlLoopOperationParams params; + private final transient PipelineUtil taskUtil; + + /** + * Time when the lock was first requested. + */ + private transient AtomicReference lockStart = new AtomicReference<>(); + + // values extracted from the policy + @Getter + private final String actor; + @Getter + private final String operation; + + + /** + * Construct an instance. + * + * @param operContext this operation's context + * @param context event context + * @param policy operation's policy + * @param executor executor for the Operation + */ + public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy, + Executor executor) { + + this.operContext = operContext; + this.eventContext = context; + this.policy = policy; + this.requestId = context.getEvent().getRequestId().toString(); + this.policyId = "" + policy.getId(); + this.actor = policy.getActor(); + this.operation = policy.getRecipe(); + + // @formatter:off + params = ControlLoopOperationParams.builder() + .actorService(operContext.getActorService()) + .actor(actor) + .operation(operation) + .context(context) + .executor(executor) + .target(policy.getTarget()) + .startCallback(this::onStart) + .completeCallback(this::onComplete) + .build(); + // @formatter:on + + taskUtil = new PipelineUtil(params); + } + + // + // Internal class used for tracking + // + @Getter + @ToString + private class Operation implements Serializable { + private static final long serialVersionUID = 1L; + + private int attempt; + private PolicyResult policyResult; + private ControlLoopOperation clOperation; + + /** + * Constructs the object. + * + * @param outcome outcome of the operation + */ + public Operation(OperationOutcome outcome) { + attempt = ControlLoopOperationManager2.this.attempts; + policyResult = outcome.getResult(); + clOperation = outcome.toControlLoopOperation(); + } + } + + /** + * Start the operation, first acquiring any locks that are needed. This should not + * throw any exceptions, but will, instead, invoke the callbacks with exceptions. + * + * @param remainingMs time remaining, in milliseconds, for the control loop + */ + @SuppressWarnings("unchecked") + public synchronized void start(long remainingMs) { + // this is synchronized while we update "future" + + try { + // provide a default, in case something fails before requestLock() is called + lockStart.set(Instant.now()); + + // @formatter:off + future = taskUtil.sequence( + this::detmTarget, + this::requestLock, + this::startOperation); + // @formatter:on + + // handle any exceptions that may be thrown, set timeout, and handle timeout + + // @formatter:off + future.exceptionally(this::handleException) + .orTimeout(remainingMs, TimeUnit.MILLISECONDS) + .exceptionally(this::handleTimeout); + // @formatter:on + + } catch (RuntimeException e) { + handleException(e); + } + } + + /** + * Start the operation, after the lock has been acquired. + * + * @return + */ + private CompletableFuture startOperation() { + // @formatter:off + ControlLoopOperationParams params2 = params.toBuilder() + .payload(new LinkedHashMap<>()) + .retry(policy.getRetry()) + .timeoutSec(policy.getTimeout()) + .targetEntity(targetEntity) + .build(); + // @formatter:on + + if (policy.getPayload() != null) { + params2.getPayload().putAll(policy.getPayload()); + } + + return params2.start(); + } + + /** + * Handles exceptions that may be generated. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleException(Throwable thrown) { + if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { + return null; + } + + logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown); + OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown); + outcome.setStart(lockStart.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + onComplete(outcome); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Handles control loop timeout exception. + * + * @param thrown exception that was generated + * @return {@code null} + */ + private OperationOutcome handleTimeout(Throwable thrown) { + logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown); + + OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown); + outcome.setActor(CL_TIMEOUT_ACTOR); + outcome.setOperation(null); + outcome.setStart(lockStart.get()); + outcome.setEnd(Instant.now()); + outcome.setFinalOutcome(true); + onComplete(outcome); + + // cancel the operation, if it's still running + future.cancel(false); + + // this outcome is not used so just return "null" + return null; + } + + /** + * Cancels the operation. + */ + public void cancel() { + synchronized (this) { + if (future == null) { + return; + } + } + + future.cancel(false); + } + + /** + * Requests a lock on the {@link #targetEntity}. + * + * @return a future to await the lock + */ + private CompletableFuture requestLock() { + /* + * Failures are handled via the callback, and successes are discarded by + * sequence(), without passing them to onComplete(). + * + * Return a COPY of the future so that if we try to cancel it, we'll only cancel + * the copy, not the original. This is done by tacking thenApply() onto the end. + */ + lockStart.set(Instant.now()); + return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome); + } + + /** + * Indicates that the lock on the target entity is unavailable. + * + * @param outcome lock outcome + */ + private void lockUnavailable(OperationOutcome outcome) { + + // Note: NEVER invoke onStart() for locks; only invoke onComplete() + onComplete(outcome); + + /* + * Now that we've added the lock outcome to the queue, ensure the future is + * canceled, which may, itself, generate an operation outcome. + */ + cancel(); + } + + /** + * Handles responses provided via the "start" callback. Note: this is never be invoked + * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks. + * + * @param outcome outcome provided to the callback + */ + private void onStart(OperationOutcome outcome) { + if (GUARD_ACTOR.equals(outcome.getActor())) { + addOutcome(outcome); + } + } + + /** + * Handles responses provided via the "complete" callback. Note: this is never invoked + * for "successful" locks. + * + * @param outcome outcome provided to the callback + */ + private void onComplete(OperationOutcome outcome) { + + switch (outcome.getActor()) { + case LOCK_ACTOR: + case GUARD_ACTOR: + case CL_TIMEOUT_ACTOR: + addOutcome(outcome); + break; + + default: + if (outcome.isFor(actor, operation)) { + addOutcome(outcome); + } + break; + } + } + + /** + * Adds an outcome to {@link #outcomes}. + * + * @param outcome outcome to be added + */ + private synchronized void addOutcome(OperationOutcome outcome) { + /* + * This is synchronized to prevent nextStep() from invoking processOutcome() at + * the same time. + */ + + logger.debug("added outcome={} for {}", outcome, requestId); + outcomes.add(outcome); + + if (outcomes.peekFirst() == outcomes.peekLast()) { + // this is the first outcome in the queue - process it + processOutcome(); + } + } + + /** + * Looks for the next step in the queue. + * + * @return {@code true} if more responses are expected, {@code false} otherwise + */ + public synchronized boolean nextStep() { + switch (state) { + case LOCK_DENIED: + case LOCK_LOST: + case GUARD_DENIED: + case CONTROL_LOOP_TIMEOUT: + return false; + default: + break; + } + + OperationOutcome outcome = outcomes.peek(); + if (outcome == null) { + // empty queue + return true; + } + + if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) { + return false; + } + + // first item has been processed, remove it + outcomes.remove(); + if (!outcomes.isEmpty()) { + // have a new "first" item - process it + processOutcome(); + } + + return true; + } + + /** + * Processes the first item in {@link #outcomes}. Sets the state, increments + * {@link #attempts}, if appropriate, and stores the operation history in the DB. + */ + private synchronized void processOutcome() { + OperationOutcome outcome = outcomes.peek(); + logger.debug("process outcome={} for {}", outcome, requestId); + + switch (outcome.getActor()) { + + case CL_TIMEOUT_ACTOR: + state = State.CONTROL_LOOP_TIMEOUT; + break; + + case LOCK_ACTOR: + // lock is no longer available + if (state == State.ACTIVE) { + state = State.LOCK_DENIED; + storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock"); + } else { + state = State.LOCK_LOST; + storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock"); + } + break; + + case GUARD_ACTOR: + if (outcome.getEnd() == null) { + state = State.GUARD_STARTED; + } else if (outcome.getResult() == PolicyResult.SUCCESS) { + state = State.GUARD_PERMITTED; + } else { + state = State.GUARD_DENIED; + storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard"); + } + break; + + default: + // operation completed + ++attempts; + state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS + : State.OPERATION_FAILURE); + operationHistory.add(new Operation(outcome)); + storeOperationInDataBase(); + break; + } + + // indicate that this has changed + operContext.updated(this); + } + + /** + * Get the operation, as a message. + * + * @return the operation, as a message + */ + public String getOperationMessage() { + Operation last = operationHistory.peekLast(); + return (last == null ? null : last.getClOperation().toMessage()); + } + + /** + * Gets the operation result. + * + * @return the operation result + */ + public PolicyResult getOperationResult() { + Operation last = operationHistory.peekLast(); + return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult()); + } + + /** + * Get the latest operation history. + * + * @return the latest operation history + */ + public String getOperationHistory() { + Operation last = operationHistory.peekLast(); + return (last == null ? null : last.clOperation.toHistory()); + } + + /** + * Get the history. + * + * @return the list of control loop operations + */ + public List getHistory() { + return operationHistory.stream().map(Operation::getClOperation).map(ControlLoopOperation::new) + .collect(Collectors.toList()); + } + + /** + * Stores a failure in the DB. + * + * @param outcome operation outcome + * @param result result to put into the DB + * @param message message to put into the DB + */ + private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) { + outcome.setActor(actor); + outcome.setOperation(operation); + outcome.setMessage(message); + outcome.setResult(result); + + operationHistory.add(new Operation(outcome)); + storeOperationInDataBase(); + } + + /** + * Stores the latest operation in the DB. + */ + private void storeOperationInDataBase() { + operContext.getDataManager().store(requestId, eventContext.getEvent(), + operationHistory.peekLast().getClOperation()); + } + + /** + * Determines the target entity. + * + * @return a future to determine the target entity, or {@code null} if the entity has + * already been determined + */ + protected CompletableFuture detmTarget() { + if (policy.getTarget() == null) { + throw new IllegalArgumentException("The target is null"); + } + + if (policy.getTarget().getType() == null) { + throw new IllegalArgumentException("The target type is null"); + } + + switch (policy.getTarget().getType()) { + case PNF: + return detmPnfTarget(); + case VM: + case VNF: + case VFMODULE: + return detmVfModuleTarget(); + default: + throw new IllegalArgumentException("The target type is not supported"); + } + } + + /** + * Determines the PNF target entity. + * + * @return a future to determine the target entity, or {@code null} if the entity has + * already been determined + */ + private CompletableFuture detmPnfTarget() { + if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) { + throw new IllegalArgumentException("Target does not match target type"); + } + + targetEntity = eventContext.getEnrichment().get(PNF_NAME); + if (targetEntity == null) { + throw new IllegalArgumentException("AAI section is missing " + PNF_NAME); + } + + return null; + } + + /** + * Determines the VF Module target entity. + * + * @return a future to determine the target entity, or {@code null} if the entity has + * already been determined + */ + private CompletableFuture detmVfModuleTarget() { + String targetFieldName = eventContext.getEvent().getTarget(); + if (targetFieldName == null) { + throw new IllegalArgumentException("Target is null"); + } + + switch (targetFieldName.toLowerCase()) { + case VSERVER_VSERVER_NAME: + targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME); + break; + case GENERIC_VNF_VNF_ID: + targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID); + break; + case GENERIC_VNF_VNF_NAME: + return detmVnfName(); + default: + throw new IllegalArgumentException("Target does not match target type"); + } + + if (targetEntity == null) { + throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName); + } + + return null; + } + + /** + * Determines the VNF Name target entity. + * + * @return a future to determine the target entity, or {@code null} if the entity has + * already been determined + */ + @SuppressWarnings("unchecked") + private CompletableFuture detmVnfName() { + // if the onset is enriched with the vnf-id, we don't need an A&AI response + targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID); + if (targetEntity != null) { + return null; + } + + // vnf-id was not in the onset - obtain it via the custom query + + // @formatter:off + ControlLoopOperationParams cqparams = params.toBuilder() + .actor(AaiConstants.ACTOR_NAME) + .operation(AaiCqResponse.OPERATION) + .targetEntity("") + .build(); + // @formatter:on + + // perform custom query and then extract the VNF ID from it + return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams), + this::extractVnfFromCq); + } + + /** + * Extracts the VNF Name target entity from the custom query data. + * + * @return {@code null} + */ + private CompletableFuture extractVnfFromCq() { + // already have the CQ data + AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY); + if (cq.getDefaultGenericVnf() == null) { + throw new IllegalArgumentException("No vnf-id found"); + } + + targetEntity = cq.getDefaultGenericVnf().getVnfId(); + if (targetEntity == null) { + throw new IllegalArgumentException("No vnf-id found"); + } + + return null; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/EventManagerServices.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/EventManagerServices.java new file mode 100644 index 000000000..d8668e47d --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/EventManagerServices.java @@ -0,0 +1,179 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Properties; +import lombok.Getter; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.utils.resources.ResourceUtils; +import org.onap.policy.controlloop.actor.guard.GuardActorServiceProvider; +import org.onap.policy.controlloop.actor.guard.GuardConfig; +import org.onap.policy.controlloop.actor.guard.GuardOperation; +import org.onap.policy.controlloop.actor.guard.GuardOperator; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerImpl; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerStub; +import org.onap.policy.controlloop.utils.ControlLoopUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Services used by the ControlLoopEventManager. + */ +@Getter +public class EventManagerServices { + public static final Logger logger = LoggerFactory.getLogger(EventManagerServices.class); + public static final String ACTOR_SERVICE_PROPERTIES = "actor.service"; + public static final String DATA_MANAGER_PROPERTIES = "operation.history"; + + public final ActorService actorService = new ActorService(); + + // assume we're using a stub until proven otherwise + public final OperationHistoryDataManager dataManager; + + /** + * Constructs the object. Configures and starts the actor service. Initializes + * {@link #dataManager}, to a "real" data manager, if guards are enabled. + * + * @param configFileName configuration file name + */ + public EventManagerServices(String configFileName) { + // configure and start actor services + Properties props = startActorService(configFileName); + + if (isGuardEnabled()) { + // guards are enabled - use a real data manager + dataManager = makeDataManager(props); + } else { + // guards are disabled - use a stub data manager + dataManager = new OperationHistoryDataManagerStub(); + } + } + + /** + * Configures and starts the actor service. + * + * @param configFileName configuration file name + * @return the properties that were loaded from the configuration file + */ + public Properties startActorService(String configFileName) { + try (InputStream inpstr = openConfigFile(configFileName)) { + Properties props = new Properties(); + props.load(inpstr); + + Map parameters = ControlLoopUtils.toObject(props, ACTOR_SERVICE_PROPERTIES); + ControlLoopUtils.compressLists(parameters); + + actorService.configure(parameters); + actorService.start(); + + return props; + + } catch (RuntimeException | IOException e) { + logger.error("cannot configure/start actor service"); + throw new IllegalStateException(e); + } + } + + /** + * Opens the config file. + * + * @param configFileName configuration file name + * @return the file's input stream + * @throws FileNotFoundException if the file cannot be found + */ + private InputStream openConfigFile(String configFileName) throws FileNotFoundException { + InputStream inpstr = ResourceUtils.getResourceAsStream(configFileName); + if (inpstr == null) { + throw new FileNotFoundException(configFileName); + } + + return inpstr; + } + + /** + * Determines if guards are enabled. + * + * @return {@code true} if guards are enabled, {@code false} otherwise + */ + public boolean isGuardEnabled() { + try { + GuardOperator guard = (GuardOperator) getActorService().getActor(GuardActorServiceProvider.NAME) + .getOperator(GuardOperation.NAME); + if (!guard.isConfigured()) { + logger.warn("cannot check 'disabled' property in GUARD actor - assuming disabled"); + return false; + } + + GuardConfig config = (GuardConfig) guard.getCurrentConfig(); + if (config.isDisabled()) { + logger.warn("guard disabled"); + return false; + } + + if (!guard.isAlive()) { + logger.warn("guard actor is not running"); + return false; + } + + return true; + + } catch (RuntimeException e) { + logger.warn("cannot check 'disabled' property in GUARD actor - assuming disabled", e); + return false; + } + } + + /** + * Makes and starts the data manager. + * + * @param props properties with which to configure the data manager + * @return a new data manager + */ + public OperationHistoryDataManagerImpl makeDataManager(Properties props) { + try { + Map parameters = ControlLoopUtils.toObject(props, DATA_MANAGER_PROPERTIES); + OperationHistoryDataManagerParams params = Util.translate(DATA_MANAGER_PROPERTIES, parameters, + OperationHistoryDataManagerParams.class); + ValidationResult result = params.validate(DATA_MANAGER_PROPERTIES); + if (!result.isValid()) { + throw new IllegalArgumentException("invalid data manager properties:\n" + result.getResult()); + } + + OperationHistoryDataManagerImpl mgr = new OperationHistoryDataManagerImpl(params); + mgr.start(); + + return mgr; + + } catch (RuntimeException e) { + logger.error("cannot start operation history data manager"); + actorService.stop(); + throw e; + } + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockData.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockData.java new file mode 100644 index 000000000..835600086 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockData.java @@ -0,0 +1,181 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Data for an individual lock. + */ +public class LockData implements LockCallback { + private static final Logger logger = LoggerFactory.getLogger(LockData.class); + + private final String targetEntity; + private final UUID requestId; + + /** + * Time when this was created. + */ + private final Instant createTime = Instant.now(); + + /** + * Future for obtaining the lock. Initially incomplete. + */ + private final AtomicReference> future = + new AtomicReference<>(new CompletableFuture<>()); + + /** + * The lock. + */ + private Lock theLock = null; + + /** + * Listeners to invoke if the lock is unavailable/lost. + */ + private final List> unavailableCallbacks = new ArrayList<>(); + + /** + * Set to a failed outcome, if the lock becomes unavailable. + */ + private OperationOutcome failedOutcome = null; + + + /** + * Constructs the object. + * + * @param targetEntity target entity + */ + public LockData(String targetEntity, UUID requestId) { + this.targetEntity = targetEntity; + this.requestId = requestId; + } + + /** + * Gets the future to be completed when the lock operation completes. + * + * @return the lock operation future + */ + public CompletableFuture getFuture() { + return future.get(); + } + + /** + * Adds a callback to be invoked if the lock becomes unavailable. + * + * @param callback callback to be added + */ + public void addUnavailableCallback(Consumer callback) { + synchronized (this) { + if (failedOutcome == null) { + // hasn't failed yet - add it to the list + unavailableCallbacks.add(callback); + return; + } + } + + // already failed - invoke the callback immediately + callback.accept(failedOutcome); + } + + /** + * Frees the lock. + */ + public void free() { + Lock lock; + + synchronized (this) { + if ((lock = theLock) == null) { + return; + } + } + + lock.free(); + } + + @Override + public synchronized void lockAvailable(Lock lock) { + logger.warn("lock granted on {} for {}", targetEntity, requestId); + theLock = lock; + + OperationOutcome outcome = makeOutcome(); + outcome.setResult(PolicyResult.SUCCESS); + outcome.setMessage(ControlLoopOperation.SUCCESS_MSG); + + future.get().complete(outcome); + } + + @Override + public void lockUnavailable(Lock unused) { + synchronized (this) { + logger.warn("lock unavailable on {} for {}", targetEntity, requestId); + failedOutcome = makeOutcome(); + failedOutcome.setResult(PolicyResult.FAILURE); + failedOutcome.setMessage(ControlLoopOperation.FAILED_MSG); + } + + /* + * In case the future was already completed successfully, replace it with a failed + * future, but complete the old one, too, in case it wasn't completed yet. + */ + future.getAndSet(CompletableFuture.completedFuture(failedOutcome)).complete(failedOutcome); + + for (Consumer callback : unavailableCallbacks) { + try { + callback.accept(new OperationOutcome(failedOutcome)); + } catch (RuntimeException e) { + logger.warn("lock callback threw an exception for {}", requestId, e); + } + } + + unavailableCallbacks.clear(); + } + + /** + * Makes a lock operation outcome. + * + * @return a new lock operation outcome + */ + private OperationOutcome makeOutcome() { + OperationOutcome outcome = new OperationOutcome(); + outcome.setActor(ControlLoopOperationManager2.LOCK_ACTOR); + outcome.setOperation(ControlLoopOperationManager2.LOCK_OPERATION); + outcome.setTarget(targetEntity); + outcome.setFinalOutcome(true); + outcome.setStart(createTime); + outcome.setEnd(Instant.now()); + + return outcome; + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ManagerContext.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ManagerContext.java new file mode 100644 index 000000000..0dcd30269 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ManagerContext.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.onap.policy.controlloop.actorserviceprovider.ActorService; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.ophistory.OperationHistoryDataManager; + +/** + * Context for the Operation Manager. + */ +public interface ManagerContext { + + /** + * Gets the actor service. + * + * @return the actor service + */ + ActorService getActorService(); + + /** + * Gets the operation history data manager. + * + * @return the operation history data manager + */ + OperationHistoryDataManager getDataManager(); + + /** + * Requests a lock on the specified target. + * + * @param target target to be locked + * @param lockUnavailableCallback callback to be invoked if the lock is + * unavailable/lost + * @return a future to await the lock + */ + CompletableFuture requestLock(String target, Consumer lockUnavailableCallback); + + /** + * Indicates that the given operation manager has been updated. + * + * @param operationMgr operation manager that has been updated + */ + void updated(ControlLoopOperationManager2 operationMgr); +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManager.java new file mode 100644 index 000000000..a1774ea6f --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManager.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.ophistory; + +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; + +/** + * Data manager for the Operation History table. + */ +public interface OperationHistoryDataManager { + + /** + * Stores an operation in the DB. If the queue is full, then the oldest records is + * discarded. + * + * @param requestId request ID + * @param event event with which the operation is associated + * @param operation operation to be stored + */ + void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation); + + /** + * Starts the background thread. + */ + public void start(); + + /** + * Stops the background thread and places an "end" item into {@link #operations}. + */ + public void stop(); +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java new file mode 100644 index 000000000..f7576f139 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java @@ -0,0 +1,295 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.ophistory; + +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.utils.jpa.EntityMgrCloser; +import org.onap.policy.common.utils.jpa.EntityTransCloser; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.database.operationshistory.Dbao; +import org.onap.policy.guard.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data manager that stores records in the DB, asynchronously, using a background thread. + */ +public class OperationHistoryDataManagerImpl implements OperationHistoryDataManager { + private static final Logger logger = LoggerFactory.getLogger(OperationHistoryDataManagerImpl.class); + + /** + * Added to the end of {@link #operations} when {@link #stop()} is called. This is + * used to get the background thread out of a blocking wait for the next record. + */ + private static final Record END_MARKER = new Record(); + + // copied from the parameters + private final int maxQueueLength; + private final int batchSize; + + private final EntityManagerFactory emFactory; + + /** + * Thread that takes records from {@link #operations} and stores them in the DB. + */ + private Thread thread; + + /** + * Set to {@code true} to stop the background thread. + */ + private boolean stopped = false; + + /** + * Queue of operations waiting to be stored in the DB. When {@link #stop()} is called, + * an {@link #END_MARKER} is added to the end of the queue. + */ + private final BlockingQueue operations = new LinkedBlockingQueue<>(); + + /** + * Number of records that have been added to the DB by this data manager instance. + */ + @Getter + private long recordsAdded = 0; + + + /** + * Constructs the object. + * + * @param params data manager parameters + */ + public OperationHistoryDataManagerImpl(OperationHistoryDataManagerParams params) { + ValidationResult result = params.validate("data-manager-properties"); + if (!result.isValid()) { + throw new IllegalArgumentException(result.getResult()); + } + + this.maxQueueLength = params.getMaxQueueLength(); + this.batchSize = params.getBatchSize(); + + // create the factory using the properties + Properties props = toProperties(params); + this.emFactory = makeEntityManagerFactory(params.getPersistenceUnit(), props); + } + + @Override + public synchronized void start() { + if (stopped || thread != null) { + // already started + return; + } + + thread = makeThread(emFactory, this::run); + thread.setDaemon(true); + thread.start(); + } + + @Override + public synchronized void stop() { + stopped = true; + + if (thread == null) { + // no thread to close the factory - do it here + emFactory.close(); + + } else { + // the thread will close the factory when it sees the end marker + operations.add(END_MARKER); + } + } + + @Override + public synchronized void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) { + + if (stopped) { + logger.warn("operation history thread is stopped, discarding requestId={} event={} operation={}", requestId, + event, operation); + return; + } + + operations.add(new Record(requestId, event, operation)); + + if (operations.size() > maxQueueLength) { + Record discarded = operations.remove(); + logger.warn("too many items to store in the operation history table, discarding {}", discarded); + } + } + + /** + * Takes records from {@link #operations} and stores them in the queue. Continues to + * run until {@link #stop()} is invoked, or the thread is interrupted. + * + * @param emfactory entity manager factory + */ + private void run(EntityManagerFactory emfactory) { + try { + // store records until stopped, continuing if an exception occurs + while (!stopped) { + try { + Record triple = operations.take(); + storeBatch(emfactory.createEntityManager(), triple); + + } catch (RuntimeException e) { + logger.error("failed to save data to operation history table", e); + + } catch (InterruptedException e) { + logger.error("interrupted, discarding remaining operation history data", e); + Thread.currentThread().interrupt(); + return; + } + } + + storeRemainingRecords(emfactory); + + } finally { + synchronized (this) { + stopped = true; + } + + emfactory.close(); + } + } + + /** + * Store any remaining records, but stop at the first exception. + * + * @param emfactory entity manager factory + */ + private void storeRemainingRecords(EntityManagerFactory emfactory) { + try { + while (!operations.isEmpty()) { + storeBatch(emfactory.createEntityManager(), operations.poll()); + } + + } catch (RuntimeException e) { + logger.error("failed to save remaining data to operation history table", e); + } + } + + /** + * Stores a batch of records. + * + * @param entityManager entity manager + * @param firstRecord first record to be stored + */ + private void storeBatch(EntityManager entityManager, Record firstRecord) { + + try (EntityMgrCloser emc = new EntityMgrCloser(entityManager); + EntityTransCloser trans = new EntityTransCloser(entityManager.getTransaction())) { + + int nrecords = 0; + Record record = firstRecord; + + while (record != null && record != END_MARKER) { + storeRecord(entityManager, record); + + if (++nrecords >= batchSize) { + break; + } + + record = operations.poll(); + } + + trans.commit(); + recordsAdded += nrecords; + } + } + + /** + * Stores a record. + * + * @param entityManager entity manager + * @param record record to be stored + */ + private void storeRecord(EntityManager entityMgr, Record record) { + + Dbao newEntry = new Dbao(); + + final VirtualControlLoopEvent event = record.getEvent(); + final ControlLoopOperation operation = record.getOperation(); + + newEntry.setClosedLoopName(event.getClosedLoopControlName()); + newEntry.setRequestId(record.getRequestId()); + newEntry.setActor(operation.getActor()); + newEntry.setOperation(operation.getOperation()); + newEntry.setTarget(operation.getTarget()); + newEntry.setSubrequestId(operation.getSubRequestId()); + newEntry.setMessage(operation.getMessage()); + newEntry.setOutcome(operation.getOutcome()); + if (operation.getStart() != null) { + newEntry.setStarttime(new Date(operation.getStart().toEpochMilli())); + } + if (operation.getEnd() != null) { + newEntry.setEndtime(new Date(operation.getEnd().toEpochMilli())); + } + + entityMgr.persist(newEntry); + } + + /** + * Converts the parameters to Properties. + * + * @param params parameters to be converted + * @return a new property set + */ + private Properties toProperties(OperationHistoryDataManagerParams params) { + Properties props = new Properties(); + props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl()); + props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName()); + props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword()); + props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader()); + + return props; + } + + @Getter + @NoArgsConstructor + @AllArgsConstructor + @ToString + private static class Record { + private String requestId; + private VirtualControlLoopEvent event; + private ControlLoopOperation operation; + } + + // the following may be overridden by junit tests + + protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) { + return Persistence.createEntityManagerFactory(opsHistPu, props); + } + + protected Thread makeThread(EntityManagerFactory emfactory, Consumer command) { + return new Thread(() -> command.accept(emfactory)); + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerParams.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerParams.java new file mode 100644 index 000000000..fc919d845 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerParams.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.ophistory; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.onap.policy.common.parameters.BeanValidator; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; +import org.onap.policy.common.parameters.annotations.NotNull; + +/** + * Parameters for a Data Manager. + */ +@NotNull +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OperationHistoryDataManagerParams { + public static final String DEFAULT_PU = "OperationsHistoryPU"; + + @NotBlank + private String url; + @NotBlank + private String userName; + + // may be blank + private String password; + + @Builder.Default + private String persistenceUnit = DEFAULT_PU; + + /** + * Maximum number of records that can be waiting to be inserted into the DB. When the + * limit is reached, the oldest records are discarded. + */ + @Min(1) + @Builder.Default + private int maxQueueLength = 10000; + + /** + * Number of records to add the DB in one transaction. + */ + @Min(1) + @Builder.Default + private int batchSize = 100; + + /** + * Validates the parameters. + * + * @param resultName name of the result + * + * @return the validation result + */ + public ValidationResult validate(String resultName) { + return new BeanValidator().validateTop(resultName, this); + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerStub.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerStub.java new file mode 100644 index 000000000..e1e0cbe09 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerStub.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.ophistory; + +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; + +/** + * Stub implementation of a data manager; all methods return without doing anything. + */ +public class OperationHistoryDataManagerStub implements OperationHistoryDataManager { + + @Override + public void store(String requestId, VirtualControlLoopEvent event, ControlLoopOperation operation) { + // do nothing + } + + @Override + public void start() { + // do nothing + } + + @Override + public void stop() { + // do nothing + } +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java index 4ef1f75c9..154462247 100644 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java @@ -57,15 +57,16 @@ public class ControlLoopProcessor implements Serializable { private final ControlLoopPolicy policy; private String currentNestedPolicyId = null; + // not serializable, thus must be transient @Getter - private ToscaPolicy toscaOpPolicy; + private transient ToscaPolicy toscaOpPolicy; @Getter private DroolsPolicy domainOpPolicy; /** * Construct an instance from yaml. - * + * * @param yaml the yaml * @throws ControlLoopException if an error occurs */ @@ -198,7 +199,7 @@ public class ControlLoopProcessor implements Serializable { /** * Get the current policy. - * + * * @return the current policy * @throws ControlLoopException if an error occurs */ @@ -217,7 +218,7 @@ public class ControlLoopProcessor implements Serializable { /** * Get the next policy given a result of the current policy. - * + * * @param result the result of the current policy * @throws ControlLoopException if an error occurs */ diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/utils/ControlLoopUtils.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/utils/ControlLoopUtils.java index b5d95fed7..d311b07fc 100644 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/utils/ControlLoopUtils.java +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/utils/ControlLoopUtils.java @@ -18,6 +18,14 @@ package org.onap.policy.controlloop.utils; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.onap.policy.controlloop.ControlLoopException; import org.onap.policy.controlloop.drl.legacy.ControlLoopParams; import org.onap.policy.controlloop.processor.ControlLoopProcessor; @@ -31,6 +39,7 @@ import org.slf4j.LoggerFactory; public class ControlLoopUtils { public static final Logger logger = LoggerFactory.getLogger(ControlLoopUtils.class); + private static final Pattern NAME_PAT = Pattern.compile("(.*)\\[(\\d{1,3})\\]"); private ControlLoopUtils() { super(); @@ -51,4 +60,205 @@ public class ControlLoopUtils { } } + // TODO move the following to policy-common/utils + + /** + * Converts a set of properties to a Map. Supports json-path style property names with + * "." separating components, where components may have an optional subscript. + * + * @param properties properties to be converted + * @param prefix properties whose names begin with this prefix are included. The + * prefix is stripped from the name before adding the value to the map + * @return a hierarchical map representing the properties + */ + public static Map toObject(Properties properties, String prefix) { + String dottedPrefix = prefix + (prefix.isEmpty() || prefix.endsWith(".") ? "" : "."); + int pfxlen = dottedPrefix.length(); + + Map map = new LinkedHashMap<>(); + + for (String name : properties.stringPropertyNames()) { + if (name.startsWith(dottedPrefix)) { + String[] components = name.substring(pfxlen).split("[.]"); + setProperty(map, components, properties.getProperty(name)); + } + } + + return map; + } + + /** + * Sets a property within a hierarchical map. + * + * @param map map into which the value should be placed + * @param names property name components + * @param value value to be placed into the map + */ + private static void setProperty(Map map, String[] names, String value) { + Map node = map; + + final int lastComp = names.length - 1; + + // process all but the final component + for (int comp = 0; comp < lastComp; ++comp) { + node = getNode(node, names[comp]); + } + + // process the final component + String name = names[lastComp]; + Matcher matcher = NAME_PAT.matcher(name); + + if (!matcher.matches()) { + // no subscript + node.put(name, value); + return; + } + + // subscripted + List array = getArray(node, matcher.group(1)); + int index = Integer.parseInt(matcher.group(2)); + expand(array, index); + array.set(index, value); + } + + /** + * Gets a node. + * + * @param map map from which to get the object + * @param name name of the element to get from the map, with an optional subscript + * @return a Map + */ + @SuppressWarnings("unchecked") + private static Map getNode(Map map, String name) { + Matcher matcher = NAME_PAT.matcher(name); + + if (!matcher.matches()) { + // no subscript + return getObject(map, name); + } + + // subscripted + List array = getArray(map, matcher.group(1)); + int index = Integer.parseInt(matcher.group(2)); + expand(array, index); + + Object item = array.get(index); + if (item instanceof Map) { + return (Map) item; + + } else { + LinkedHashMap result = new LinkedHashMap<>(); + array.set(index, result); + return result; + } + } + + /** + * Ensures that an array's size is large enough to hold the specified element. + * + * @param array array to be expanded + * @param index index of the desired element + */ + private static void expand(List array, int index) { + while (array.size() <= index) { + array.add(null); + } + } + + /** + * Gets an object (i.e., Map) from a map. If the particular element is not a Map, then + * it is replaced with an empty Map. + * + * @param map map from which to get the object + * @param name name of the element to get from the map, without any subscript + * @return a Map + */ + private static Map getObject(Map map, String name) { + @SuppressWarnings("unchecked") + Map result = (Map) map.compute(name, (key, value) -> { + if (value instanceof Map) { + return value; + } else { + return new LinkedHashMap<>(); + } + }); + + return result; + } + + /** + * Gets an array from a map. If the particular element is not an array, then it is + * replaced with an empty array. + * + * @param map map from which to get the array + * @param name name of the element to get from the map, without any subscript + * @return an array + */ + private static List getArray(Map map, String name) { + @SuppressWarnings("unchecked") + List result = (List) map.compute(name, (key, value) -> { + if (value instanceof List) { + return value; + } else { + return new ArrayList<>(); + } + }); + + return result; + } + + /** + * Compresses lists contained within a generic object, removing all {@code null} + * items. + * + * @param object object to be compressed + * @return the original object, modified in place + */ + public static Object compressLists(Object object) { + if (object instanceof Map) { + @SuppressWarnings("unchecked") + Map asMap = (Map) object; + compressMapValues(asMap); + + } else if (object instanceof List) { + @SuppressWarnings("unchecked") + List asList = (List) object; + compressListItems(asList); + } + + // else: ignore anything else + + return object; + } + + /** + * Walks a hierarchical map and removes {@code null} items found in any Lists. + * + * @param map map whose lists are to be compressed + */ + private static void compressMapValues(Map map) { + for (Object value : map.values()) { + compressLists(value); + } + } + + /** + * Removes {@code null} items from the list. In addition, it walks the items within + * the list, compressing them, as well. + * + * @param list the list to be compressed + */ + private static void compressListItems(List list) { + Iterator iter = list.iterator(); + while (iter.hasNext()) { + Object item = iter.next(); + if (item == null) { + // null item - remove it + iter.remove(); + + } else { + compressLists(item); + } + } + } } -- cgit 1.2.3-korg