diff options
Diffstat (limited to 'controlloop/common/eventmanager/src/main')
7 files changed, 3 insertions, 2376 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java deleted file mode 100644 index 5546a0baa..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java +++ /dev/null @@ -1,880 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * controlloop event manager - * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.eventmanager; - -import static org.onap.policy.controlloop.ControlLoopTargetType.PNF; -import static org.onap.policy.controlloop.ControlLoopTargetType.VM; -import static org.onap.policy.controlloop.ControlLoopTargetType.VNF; - -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.onap.policy.aai.AaiCqResponse; -import org.onap.policy.aai.AaiManager; -import org.onap.policy.aai.util.AaiException; -import org.onap.policy.controlloop.ControlLoopEventStatus; -import org.onap.policy.controlloop.ControlLoopException; -import org.onap.policy.controlloop.ControlLoopNotificationType; -import org.onap.policy.controlloop.ControlLoopOperation; -import org.onap.policy.controlloop.VirtualControlLoopEvent; -import org.onap.policy.controlloop.VirtualControlLoopNotification; -import org.onap.policy.controlloop.policy.FinalResult; -import org.onap.policy.controlloop.policy.Policy; -import org.onap.policy.controlloop.processor.ControlLoopProcessor; -import org.onap.policy.drools.core.lock.Lock; -import org.onap.policy.drools.core.lock.LockCallback; -import org.onap.policy.drools.core.lock.LockImpl; -import org.onap.policy.drools.core.lock.LockState; -import org.onap.policy.drools.system.PolicyEngineConstants; -import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; -import org.onap.policy.rest.RestManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ControlLoopEventManager implements Serializable { - public static final String PROV_STATUS_ACTIVE = "ACTIVE"; - private static final String VM_NAME = "VM_NAME"; - private static final String VNF_NAME = "VNF_NAME"; - public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id"; - public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name"; - public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name"; - public static final String GENERIC_VNF_IS_CLOSED_LOOP_DISABLED = "generic-vnf.is-closed-loop-disabled"; - public static final String VSERVER_IS_CLOSED_LOOP_DISABLED = "vserver.is-closed-loop-disabled"; - private static final String PNF_IS_IN_MAINT = "pnf.in-maint"; - public static final String GENERIC_VNF_PROV_STATUS = "generic-vnf.prov-status"; - public static final String VSERVER_PROV_STATUS = "vserver.prov-status"; - public static final String PNF_ID = "pnf.pnf-id"; - public static final String PNF_NAME = "pnf.pnf-name"; - - public static final String AAI_URL = "aai.url"; - public static final String AAI_USERNAME_PROPERTY = "aai.username"; - public static final String AAI_PASS_PROPERTY = "aai.password"; - - - /** - * Additional time, in seconds, to add to a "lock" request. This ensures that the lock won't expire right before an - * operation completes. - */ - private static final int ADDITIONAL_LOCK_SEC = 60; - - private static final Logger logger = LoggerFactory.getLogger(ControlLoopEventManager.class); - - private static final long serialVersionUID = -1216568161322872641L; - - private static final Set<String> VALID_TARGETS; - - static { - VALID_TARGETS = Collections.unmodifiableSet(new HashSet<>( - Stream.of(VM_NAME, VNF_NAME, VSERVER_VSERVER_NAME, GENERIC_VNF_VNF_ID, GENERIC_VNF_VNF_NAME, PNF_NAME) - .map(String::toLowerCase).collect(Collectors.toList()))); - } - - public final String closedLoopControlName; - private final UUID requestId; - - private String controlLoopResult; - private ControlLoopProcessor processor = null; - private VirtualControlLoopEvent onset; - private Integer numOnsets = 0; - private Integer numAbatements = 0; - private VirtualControlLoopEvent abatement; - private FinalResult controlLoopTimedOut = null; - - private boolean isActivated = false; - private LinkedList<ControlLoopOperation> controlLoopHistory = new LinkedList<>(); - private ControlLoopOperationManager currentOperation = null; - private ControlLoopOperationManager lastOperationManager = null; - private transient Lock targetLock = null; - private boolean useTargetLock = true; - - /** - * Constructs the object. - * - * @param closedLoopControlName name of the control loop - * @param requestId ID of the request with which this manager is associated - */ - public ControlLoopEventManager(String closedLoopControlName, UUID requestId) { - this.closedLoopControlName = closedLoopControlName; - this.requestId = requestId; - } - - public String getClosedLoopControlName() { - return closedLoopControlName; - } - - public String getControlLoopResult() { - return controlLoopResult; - } - - public void setControlLoopResult(String controlLoopResult) { - this.controlLoopResult = controlLoopResult; - } - - public Integer getNumOnsets() { - return numOnsets; - } - - public void setNumOnsets(Integer numOnsets) { - this.numOnsets = numOnsets; - } - - public Integer getNumAbatements() { - return numAbatements; - } - - public void setNumAbatements(Integer numAbatements) { - this.numAbatements = numAbatements; - } - - public boolean isActivated() { - return isActivated; - } - - public void setActivated(boolean isActivated) { - this.isActivated = isActivated; - } - - public boolean useTargetLock() { - return useTargetLock(); - } - - public void setUseTargetLock(boolean useTargetLock) { - this.useTargetLock = useTargetLock; - } - - public VirtualControlLoopEvent getOnsetEvent() { - return this.onset; - } - - public VirtualControlLoopEvent getAbatementEvent() { - return this.abatement; - } - - public ControlLoopProcessor getProcessor() { - return this.processor; - } - - public UUID getRequestId() { - return requestId; - } - - - private VirtualControlLoopNotification rejectNotification(VirtualControlLoopEvent event, String message) { - VirtualControlLoopNotification notification = new VirtualControlLoopNotification(event); - notification.setNotification(ControlLoopNotificationType.REJECTED); - notification.setMessage(message); - return notification; - } - - /** - * Preactivations check for an event. - * - * @param event the event - * @return the VirtualControlLoopNotification - */ - private VirtualControlLoopNotification preActivationChecks(VirtualControlLoopEvent event) { - try { - // - // This method should ONLY be called ONCE - // - if (this.isActivated) { - return rejectNotification(event, "ControlLoopEventManager has already been activated."); - } - - // - // Syntax check the event - // - checkEventSyntax(event); - } catch (ControlLoopException e) { - logger.warn("{}: invalid event syntax: ", this, e); - return rejectNotification(event, e.getMessage()); - - } - - return new VirtualControlLoopNotification(event); - } - - /** - * Activate a control loop event. - * - * @param event the event - * @return the VirtualControlLoopNotification - */ - public VirtualControlLoopNotification activate(VirtualControlLoopEvent event) { - VirtualControlLoopNotification notification = preActivationChecks(event); - if (notification.getNotification() == ControlLoopNotificationType.REJECTED) { - return notification; - } - - return postActivate(event, notification); - } - - /** - * Activate a control loop event. - * - * @param yamlSpecification the yaml specification - * @param event the event - * @return the VirtualControlLoopNotification - */ - public VirtualControlLoopNotification activate(String yamlSpecification, VirtualControlLoopEvent event) { - VirtualControlLoopNotification notification = preActivationChecks(event); - if (notification.getNotification() == ControlLoopNotificationType.REJECTED) { - return notification; - } - - if (yamlSpecification == null || yamlSpecification.length() < 1) { - return rejectNotification(event, "yaml specification is null or 0 length"); - } - - String decodedYaml = null; - try { - decodedYaml = URLDecoder.decode(yamlSpecification, "UTF-8"); - if (decodedYaml != null && decodedYaml.length() > 0) { - yamlSpecification = decodedYaml; - } - } catch (UnsupportedEncodingException e) { - logger.warn("{}: YAML decode in activate by YAML specification and event threw: ", this, e); - return rejectNotification(event, e.getMessage()); - } - - try { - // - // Parse the YAML specification - // - this.processor = new ControlLoopProcessor(yamlSpecification); - } catch (ControlLoopException e) { - logger.error("{}: activate by YAML specification and event threw: ", this, e); - return rejectNotification(event, e.getMessage()); - } - - return postActivate(event, notification); - } - - /** - * Activate a control loop event. - * - * @param toscaPolicy the tosca policy - * @param event the event - * @return the VirtualControlLoopNotification - */ - public VirtualControlLoopNotification activate(ToscaPolicy toscaPolicy, VirtualControlLoopEvent event) { - VirtualControlLoopNotification notification = preActivationChecks(event); - if (notification.getNotification() == ControlLoopNotificationType.REJECTED) { - return notification; - } - - try { - this.processor = new ControlLoopProcessor(toscaPolicy); - } catch (ControlLoopException e) { - logger.error("{}: activate from Tosca Policy threw: ", this, e); - return rejectNotification(event, e.getMessage()); - } - - return postActivate(event, notification); - } - - private VirtualControlLoopNotification postActivate( - VirtualControlLoopEvent event, VirtualControlLoopNotification notification) { - // - // At this point we are good to go with this event - // - this.onset = event; - this.numOnsets = 1; - - // - // Set ourselves as active - // - this.isActivated = true; - - notification.setNotification(ControlLoopNotificationType.ACTIVE); - return notification; - } - - /** - * Check if the control loop is final. - * - * @return a VirtualControlLoopNotification if the control loop is final, otherwise <code>null</code> is returned - * @throws ControlLoopException if an error occurs - */ - public VirtualControlLoopNotification isControlLoopFinal() throws ControlLoopException { - validateFinalControlLoop(); - // - // Ok, start creating the notification - // - VirtualControlLoopNotification notification = new VirtualControlLoopNotification(this.onset); - // - // Check if the overall control loop has timed out - // - if (this.isControlLoopTimedOut()) { - // - // Yes we have timed out - // - notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); - notification.setMessage("Control Loop timed out"); - notification.getHistory().addAll(this.controlLoopHistory); - return notification; - } - // - // Check if the current policy is Final - // - FinalResult result = this.processor.checkIsCurrentPolicyFinal(); - if (result == null) { - // - // we are not at a final result - // - return null; - } - - switch (result) { - case FINAL_FAILURE_EXCEPTION: - notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); - notification.setMessage("Exception in processing closed loop"); - break; - case FINAL_FAILURE: - case FINAL_FAILURE_RETRIES: - case FINAL_FAILURE_TIMEOUT: - case FINAL_FAILURE_GUARD: - notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); - break; - case FINAL_OPENLOOP: - notification.setNotification(ControlLoopNotificationType.FINAL_OPENLOOP); - break; - case FINAL_SUCCESS: - notification.setNotification(ControlLoopNotificationType.FINAL_SUCCESS); - break; - default: - return null; - } - // - // Be sure to add all the history - // - notification.getHistory().addAll(this.controlLoopHistory); - return notification; - } - - private void validateFinalControlLoop() throws ControlLoopException { - // - // Check if they activated us - // - if (!this.isActivated) { - throw new ControlLoopException("ControlLoopEventManager MUST be activated first."); - } - // - // Make sure we are expecting this call. - // - if (this.onset == null) { - throw new ControlLoopException("No onset event for ControlLoopEventManager."); - } - } - - /** - * Process the control loop. - * - * @return a ControlLoopOperationManager - * @throws ControlLoopException if an error occurs - */ - public ControlLoopOperationManager processControlLoop() throws ControlLoopException { - validateFinalControlLoop(); - // - // Is there a current operation? - // - if (this.currentOperation != null) { - // - // Throw an exception, or simply return the current operation? - // - throw new ControlLoopException("Already working an Operation, do not call this method."); - } - // - // Ensure we are not FINAL - // - VirtualControlLoopNotification notification = this.isControlLoopFinal(); - if (notification != null) { - // - // This is weird, we require them to call the isControlLoopFinal() method first - // - // We should really abstract this and avoid throwing an exception, because it really - // isn't an exception. - // - throw new ControlLoopException("Control Loop is in FINAL state, do not call this method."); - } - // - // Not final so get the policy that needs to be worked on. - // - Policy policy = this.processor.getCurrentPolicy(); - if (policy == null) { - throw new ControlLoopException("ControlLoopEventManager: processor came upon null Policy."); - } - // - // And setup an operation - // - this.lastOperationManager = this.currentOperation; - this.currentOperation = new ControlLoopOperationManager(this.onset, policy, this); - // - // Return it - // - return this.currentOperation; - } - - /** - * Finish an operation. - * - * @param operation the operation - */ - public void finishOperation(ControlLoopOperationManager operation) throws ControlLoopException { - // - // Verify we have a current operation - // - if (this.currentOperation != null) { - // - // Validate they are finishing the current operation - // PLD - this is simply comparing the policy. Do we want to equals the whole object? - // - if (this.currentOperation.policy.equals(operation.policy)) { - logger.debug("Finishing {} result is {}", this.currentOperation.policy.getRecipe(), - this.currentOperation.getOperationResult()); - // - // Save history - // - this.controlLoopHistory.addAll(this.currentOperation.getHistory()); - // - // Move to the next Policy - // - this.processor.nextPolicyForResult(this.currentOperation.getOperationResult()); - // - // Just null this out - // - this.lastOperationManager = this.currentOperation; - this.currentOperation = null; - - // - // Don't release the lock - it may be re-used by the next operation - // - - return; - } - logger.debug("Cannot finish current operation {} does not match given operation {}", - this.currentOperation.policy, operation.policy); - return; - } - throw new ControlLoopException("No operation to finish."); - } - - /** - * Obtain a lock for the current operation. - * - * @param callback call-back to be invoked when the lock state changes - * @return a pair containing the old lock and the new lock, either of which may be null - * @throws ControlLoopException if an error occurs - */ - public synchronized Pair<Lock, Lock> lockCurrentOperation(LockCallback callback) throws ControlLoopException { - // - // Sanity check - // - if (this.currentOperation == null) { - throw new ControlLoopException("Do not have a current operation."); - } - - // - // Release the old lock if it's for a different resource. - // - Lock oldLock = null; - if (this.targetLock != null - && !this.targetLock.getResourceId().equals(this.currentOperation.getTargetEntity())) { - logger.debug("{}: different resource - releasing old lock", getClosedLoopControlName()); - oldLock = this.targetLock; - this.targetLock = null; - } - - // keep the lock a little longer than the operation, including retries - int optimeout = Math.max(1, this.currentOperation.getOperationTimeout()); - int nattempts = 1 + Math.max(0, this.currentOperation.getMaxRetries()); - int holdSec = optimeout * nattempts + ADDITIONAL_LOCK_SEC; - - // - // Have we acquired it already? - // - if (this.targetLock != null) { - // we have the lock - just extend it - this.targetLock.extend(holdSec, callback); - return Pair.of(oldLock, null); - - } else if (this.useTargetLock) { - this.targetLock = createRealLock(this.currentOperation.getTargetEntity(), this.onset.getRequestId(), - holdSec, callback); - return Pair.of(oldLock, this.targetLock); - - } else { - // Not using target locks - create a lock w/o actually locking. - logger.debug("{}: not using target locking; using pseudo locks", getClosedLoopControlName()); - this.targetLock = createPseudoLock(this.currentOperation.getTargetEntity(), this.onset.getRequestId(), - holdSec, callback); - - // Note: no need to invoke callback, as the lock is already ACTIVE - - return Pair.of(oldLock, this.targetLock); - } - } - - /** - * Releases the lock for the current operation, deleting it from working memory. - * - * @return the lock, if the operation was locked, {@code null} otherwise - */ - public synchronized Lock unlockCurrentOperation() { - if (this.targetLock == null) { - return null; - } - - Lock lock = this.targetLock; - this.targetLock = null; - - lock.free(); - - return lock; - } - - public enum NewEventStatus { - FIRST_ONSET, SUBSEQUENT_ONSET, FIRST_ABATEMENT, SUBSEQUENT_ABATEMENT, SYNTAX_ERROR; - } - - /** - * An event onset/abatement. - * - * @param event the event - * @return the status - * @throws AaiException if an error occurs retrieving information from A&AI - */ - public NewEventStatus onNewEvent(VirtualControlLoopEvent event) throws AaiException { - try { - this.checkEventSyntax(event); - if (event.getClosedLoopEventStatus() == ControlLoopEventStatus.ONSET) { - // - // Check if this is our original ONSET - // - if (event.equals(this.onset)) { - // - // DO NOT retract it - // - return NewEventStatus.FIRST_ONSET; - } - // - // Log that we got an onset - // - this.numOnsets++; - return NewEventStatus.SUBSEQUENT_ONSET; - } else if (event.getClosedLoopEventStatus() == ControlLoopEventStatus.ABATED) { - // - // Have we already got an abatement? - // - if (this.abatement == null) { - // - // Save this - // - this.abatement = event; - // - // Keep track that we received another - // - this.numAbatements++; - // - // - // - return NewEventStatus.FIRST_ABATEMENT; - } else { - // - // Keep track that we received another - // - this.numAbatements++; - // - // - // - return NewEventStatus.SUBSEQUENT_ABATEMENT; - } - } - } catch (ControlLoopException e) { - logger.error("{}: onNewEvent threw: ", this, e); - } - return NewEventStatus.SYNTAX_ERROR; - } - - - /** - * Commit the abatement to the history database. - * - * @param message the abatement message - * @param outcome the abatement outcome - */ - public void commitAbatement(String message, String outcome) { - if (this.lastOperationManager == null) { - logger.error("{}: commitAbatement: no operation manager", this); - return; - } - try { - this.lastOperationManager.commitAbatement(message, outcome); - } catch (NoSuchElementException e) { - logger.error("{}: commitAbatement threw an exception ", this, e); - } - } - - - /** - * Set the control loop time out. - * - * @return a VirtualControlLoopNotification - */ - public VirtualControlLoopNotification setControlLoopTimedOut() { - this.controlLoopTimedOut = FinalResult.FINAL_FAILURE_TIMEOUT; - VirtualControlLoopNotification notification = new VirtualControlLoopNotification(this.onset); - notification.setNotification(ControlLoopNotificationType.FINAL_FAILURE); - notification.setMessage("Control Loop timed out"); - notification.getHistory().addAll(this.controlLoopHistory); - return notification; - } - - public boolean isControlLoopTimedOut() { - return (this.controlLoopTimedOut == FinalResult.FINAL_FAILURE_TIMEOUT); - } - - /** - * Get the control loop timeout. - * - * @param defaultTimeout the default timeout - * @return the timeout - */ - public int getControlLoopTimeout(Integer defaultTimeout) { - if (this.processor != null && this.processor.getControlLoop() != null) { - Integer timeout = this.processor.getControlLoop().getTimeout(); - if (timeout != null && timeout > 0) { - return timeout; - } - } - if (defaultTimeout != null) { - return defaultTimeout; - } - return 0; - } - - /** - * Check an event syntax. - * - * @param event the event syntax - * @throws ControlLoopException if an error occurs - */ - public void checkEventSyntax(VirtualControlLoopEvent event) throws ControlLoopException { - validateStatus(event); - if (StringUtils.isBlank(event.getClosedLoopControlName())) { - throw new ControlLoopException("No control loop name"); - } - if (event.getRequestId() == null) { - throw new ControlLoopException("No request ID"); - } - if (event.getClosedLoopEventStatus() == ControlLoopEventStatus.ABATED) { - return; - } - if (StringUtils.isBlank(event.getTarget())) { - throw new ControlLoopException("No target field"); - } else if (!VALID_TARGETS.contains(event.getTarget().toLowerCase())) { - throw new ControlLoopException("target field invalid - expecting VM_NAME or VNF_NAME"); - } - validateAaiData(event); - } - - private void validateStatus(VirtualControlLoopEvent event) throws ControlLoopException { - if (event.getClosedLoopEventStatus() == null - || (event.getClosedLoopEventStatus() != ControlLoopEventStatus.ONSET - && event.getClosedLoopEventStatus() != ControlLoopEventStatus.ABATED)) { - throw new ControlLoopException("Invalid value in closedLoopEventStatus"); - } - } - - private void validateAaiData(VirtualControlLoopEvent event) throws ControlLoopException { - Map<String, String> eventAai = event.getAai(); - if (eventAai == null) { - throw new ControlLoopException("AAI is null"); - } - switch (event.getTargetType()) { - case VM: - case VNF: - validateAaiVmVnfData(eventAai); - return; - case PNF: - validateAaiPnfData(eventAai); - return; - default: - throw new ControlLoopException("The target type is not supported"); - } - } - - private void validateAaiVmVnfData(Map<String, String> eventAai) throws ControlLoopException { - if (eventAai.get(GENERIC_VNF_VNF_ID) == null && eventAai.get(VSERVER_VSERVER_NAME) == null - && eventAai.get(GENERIC_VNF_VNF_NAME) == null) { - throw new ControlLoopException( - "generic-vnf.vnf-id or generic-vnf.vnf-name or vserver.vserver-name information missing"); - } - } - - private void validateAaiPnfData(Map<String, String> eventAai) throws ControlLoopException { - if (eventAai.get(PNF_NAME) == null) { - throw new ControlLoopException("AAI PNF object key pnf-name is missing"); - } - } - - /** - * Is closed loop disabled for an event. - * - * @param event the event - * @return <code>true</code> if the control loop is disabled, <code>false</code> otherwise - */ - public static boolean isClosedLoopDisabled(VirtualControlLoopEvent event) { - Map<String, String> aai = event.getAai(); - return (isAaiTrue(aai.get(VSERVER_IS_CLOSED_LOOP_DISABLED)) - || isAaiTrue(aai.get(GENERIC_VNF_IS_CLOSED_LOOP_DISABLED)) - || isAaiTrue(aai.get(PNF_IS_IN_MAINT))); - } - - /** - * Does provisioning status, for an event, have a value other than ACTIVE. - * - * @param event the event - * @return {@code true} if the provisioning status is neither ACTIVE nor {@code null}, {@code false} otherwise - */ - protected static boolean isProvStatusInactive(VirtualControlLoopEvent event) { - Map<String, String> aai = event.getAai(); - return (!PROV_STATUS_ACTIVE.equals(aai.getOrDefault(VSERVER_PROV_STATUS, PROV_STATUS_ACTIVE)) - || !PROV_STATUS_ACTIVE.equals(aai.getOrDefault(GENERIC_VNF_PROV_STATUS, PROV_STATUS_ACTIVE))); - } - - /** - * Determines the boolean value represented by the given AAI field value. - * - * @param aaiValue value to be examined - * @return the boolean value represented by the field value, or {@code false} if the value is {@code null} - */ - protected static boolean isAaiTrue(String aaiValue) { - return ("true".equalsIgnoreCase(aaiValue) || "T".equalsIgnoreCase(aaiValue) || "yes".equalsIgnoreCase(aaiValue) - || "Y".equalsIgnoreCase(aaiValue)); - } - - @Override - public String toString() { - return "ControlLoopEventManager [closedLoopControlName=" + closedLoopControlName + ", requestId=" + requestId - + ", processor=" + processor + ", onset=" + (onset != null ? onset.getRequestId() : "null") - + ", numOnsets=" + numOnsets + ", numAbatements=" + numAbatements + ", isActivated=" + isActivated - + ", currentOperation=" + currentOperation + ", targetLock=" + targetLock + "]"; - } - - /** - * This function calls Aai Custom Query and responds with the AaiCqResponse. - * - * @param event input event - * @return AaiCqResponse Response from Aai for custom query. Can not be null. - * @throws AaiException if error occurs - */ - public AaiCqResponse getCqResponse(VirtualControlLoopEvent event) throws AaiException { - - Map<String, String> aai = event.getAai(); - - if (aai.containsKey(VSERVER_IS_CLOSED_LOOP_DISABLED) || aai.containsKey(GENERIC_VNF_IS_CLOSED_LOOP_DISABLED)) { - - if (isClosedLoopDisabled(event)) { - throw new AaiException("is-closed-loop-disabled is set to true on VServer or VNF"); - } - - if (isProvStatusInactive(event)) { - throw new AaiException("prov-status is not ACTIVE on VServer or VNF"); - } - } - - if (!aai.containsKey(VSERVER_VSERVER_NAME)) { - throw new AaiException("Vserver name is missing"); - } - - UUID reqId = event.getRequestId(); - AaiCqResponse response = null; - String vserverId = event.getAai().get(VSERVER_VSERVER_NAME); - - String aaiHostUrl = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_URL); - String aaiUser = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_USERNAME_PROPERTY); - String aaiPassword = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_PASS_PROPERTY); - - response = new AaiManager(new RestManager()).getCustomQueryResponse(aaiHostUrl, aaiUser, aaiPassword, reqId, - vserverId); - - if (response == null) { - throw new AaiException("Target vnf-id could not be found"); - } - - return response; - - } - - /** - * Get the specified pnf data from aai. - * @param event the event containing pnf id. - * @return pnf key value data. - * @throws AaiException if an aai error occurs. - */ - public Map<String, String> getPnf(VirtualControlLoopEvent event) throws AaiException { - Map<String, String> aai = event.getAai(); - - if (!aai.containsKey(PNF_NAME)) { - throw new AaiException("Missing unique identifier for PNF AAI object in the event."); - } - - UUID reqId = event.getRequestId(); - String pnfName = event.getAai().get(PNF_NAME); - String aaiHostUrl = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_URL); - String aaiUser = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_USERNAME_PROPERTY); - String aaiPassword = PolicyEngineConstants.getManager().getEnvironmentProperty(AAI_PASS_PROPERTY); - - Map<String, String> pnfParams = - new AaiManager(new RestManager()).getPnf(aaiHostUrl, aaiUser, aaiPassword, reqId, pnfName); - - if (pnfParams == null) { - throw new AaiException("Aai response is undefined"); - } - return pnfParams; - } - - - // the following methods may be overridden by junit tests - - protected Lock createRealLock(String targetEntity, UUID requestId, int holdSec, LockCallback callback) { - return PolicyEngineConstants.getManager().createLock(targetEntity, requestId.toString(), holdSec, callback, - false); - } - - // note: the "callback" is required, because it will be invoked when lock.extend() is - // invoked - protected Lock createPseudoLock(String targetEntity, UUID requestId, int holdSec, LockCallback callback) { - return new LockImpl(LockState.ACTIVE, targetEntity, requestId.toString(), holdSec, callback); - } -} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java deleted file mode 100644 index d16468de2..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java +++ /dev/null @@ -1,1207 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * controlloop operation manager - * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved. - * Modifications Copyright (C) 2019 Tech Mahindra - * Modifications Copyright (C) 2019 Bell Canada. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.eventmanager; - -import java.io.Serializable; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.Properties; -import javax.persistence.EntityManager; -import javax.persistence.Persistence; -import org.apache.commons.lang3.tuple.Pair; -import org.eclipse.persistence.config.PersistenceUnitProperties; -import org.onap.aai.domain.yang.GenericVnf; -import org.onap.aai.domain.yang.ServiceInstance; -import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; -import org.onap.policy.aai.AaiCqResponse; -import org.onap.policy.aai.util.AaiException; -import org.onap.policy.appc.Response; -import org.onap.policy.appc.ResponseCode; -import org.onap.policy.appclcm.AppcLcmDmaapWrapper; -import org.onap.policy.cds.CdsResponse; -import org.onap.policy.controlloop.ControlLoopEvent; -import org.onap.policy.controlloop.ControlLoopException; -import org.onap.policy.controlloop.ControlLoopOperation; -import org.onap.policy.controlloop.ControlLoopResponse; -import org.onap.policy.controlloop.VirtualControlLoopEvent; -import org.onap.policy.controlloop.actor.appc.AppcActor; -import org.onap.policy.controlloop.actor.appclcm.AppcLcmActor; -import org.onap.policy.controlloop.actor.cds.CdsActor; -import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants; -import org.onap.policy.controlloop.actor.sdnc.SdncActor; -import org.onap.policy.controlloop.actor.sdnr.SdnrActor; -import org.onap.policy.controlloop.actor.so.SoActor; -import org.onap.policy.controlloop.actor.vfc.VfcActor; -import org.onap.policy.controlloop.policy.Policy; -import org.onap.policy.controlloop.policy.PolicyResult; -import org.onap.policy.controlloop.policy.TargetType; -import org.onap.policy.drools.system.PolicyEngineConstants; -import org.onap.policy.guard.OperationsHistory; -import org.onap.policy.guard.Util; -import org.onap.policy.sdnc.SdncResponse; -import org.onap.policy.sdnr.PciResponseWrapper; -import org.onap.policy.so.SoResponseWrapper; -import org.onap.policy.vfc.VfcResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ControlLoopOperationManager implements Serializable { - private static final String SUCCESS_MSG = " Success"; - private static final String FAILED_MSG = " Failed"; - private static final long serialVersionUID = -3773199283624595410L; - private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager.class); - - private static final String VSERVER_VSERVER_NAME = "vserver.vserver-name"; - private static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name"; - private static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id"; - - private static final String AAI_SERVICE_INSTANCE_ID_KEY = "service-instance.service-instance-id"; - private static final String PNF_NAME = "pnf.pnf-name"; - - // - // These properties are not changeable, but accessible - // for Drools Rule statements. - // - public final ControlLoopEvent onset; - public final Policy policy; - - // - // Properties used to track the Operation - // - private int attempts = 0; - private Operation currentOperation = null; - private LinkedList<Operation> operationHistory = new LinkedList<>(); - private PolicyResult policyResult = null; - private ControlLoopEventManager eventManager = null; - private String targetEntity; - private String guardApprovalStatus = "NONE"; // "NONE", "PERMIT", "DENY" - private AaiCqResponse aaiCqResponse; - private transient Object operationRequest; - - /** - * Construct an instance. - * - * @param onset the onset event - * @param policy the policy - * @param em the event manager - * @throws ControlLoopException if an error occurs - */ - public ControlLoopOperationManager(ControlLoopEvent onset, Policy policy, - ControlLoopEventManager em) throws ControlLoopException { - - this.onset = onset; - this.policy = policy; - this.guardApprovalStatus = "NONE"; - this.eventManager = em; - - try { - if (TargetType.VNF.equals(policy.getTarget().getType()) - || TargetType.VFMODULE.equals(policy.getTarget().getType())) { - this.aaiCqResponse = - this.eventManager.getCqResponse((VirtualControlLoopEvent) onset); - } - - this.targetEntity = getTarget(policy); - - initActor(policy); - - } catch (AaiException e) { - throw new ControlLoopException(e.getMessage(), e); - } - } - - private void initActor(Policy policy) throws AaiException, ControlLoopException { - // - // Let's make a sanity check - // - switch (policy.getActor()) { - case "APPC": - initAppc(policy); - break; - case "SO": - break; - case "SDNR": - break; - case "VFC": - break; - case "SDNC": - break; - case "CDS": - break; - default: - throw new ControlLoopException( - "ControlLoopEventManager: policy has an unknown actor."); - } - } - - private void initAppc(Policy policy) throws AaiException { - if ("ModifyConfig".equalsIgnoreCase(policy.getRecipe())) { - /* - * The target vnf-id may not be the same as the source vnf-id specified in the yaml, the - * target - * vnf-id is retrieved by a named query to A&AI. - */ - GenericVnf genvnf = this.aaiCqResponse - .getGenericVnfByModelInvariantId(policy.getTarget().getResourceID()); - if (genvnf == null) { - logger.info("Target entity could not be found"); - throw new AaiException("Target vnf-id could not be found"); - } - this.targetEntity = genvnf.getVnfId(); - - } - } - - public ControlLoopEventManager getEventManager() { - return eventManager; - } - - public void setEventManager(ControlLoopEventManager eventManager) { - this.eventManager = eventManager; - } - - public String getTargetEntity() { - return this.targetEntity; - } - - @Override - public String toString() { - return "ControlLoopOperationManager [onset=" - + (onset != null ? onset.getRequestId() : "null") + ", policy=" - + (policy != null ? policy.getId() : "null") + ", attempts=" + attempts - + ", policyResult=" + policyResult + ", currentOperation=" + currentOperation - + ", operationHistory=" + operationHistory + "]"; - } - - // - // Internal class used for tracking - // - private class Operation implements Serializable { - private static final long serialVersionUID = 1L; - - private ControlLoopOperation clOperation = new ControlLoopOperation(); - private PolicyResult policyResult = null; - private int attempt = 0; - - @Override - public String toString() { - return "Operation [attempt=" + attempt + ", policyResult=" + policyResult - + ", operation=" + clOperation + "]"; - } - } - - public Object getOperationRequest() { - return operationRequest; - } - - public String getGuardApprovalStatus() { - return guardApprovalStatus; - } - - public void setGuardApprovalStatus(String guardApprovalStatus) { - this.guardApprovalStatus = guardApprovalStatus; - } - - /** - * Get the target for a policy. - * - * @param policy the policy - * @return the target - * @throws ControlLoopException if an error occurs - */ - public String getTarget(Policy policy) throws ControlLoopException { - if (policy.getTarget() == null) { - throw new ControlLoopException("The target is null"); - } - - if (policy.getTarget().getType() == null) { - throw new ControlLoopException("The target type is null"); - } - - switch (policy.getTarget().getType()) { - case PNF: - return getPnfTarget(); - case VM: - case VNF: - return getVfModuleTarget(); - case VFMODULE: - return getVfModuleTarget(); - default: - throw new ControlLoopException("The target type is not supported"); - } - } - - private String getVfModuleTarget() throws ControlLoopException { - VirtualControlLoopEvent virtualOnsetEvent = (VirtualControlLoopEvent) this.onset; - if (this.onset.getTarget().equalsIgnoreCase(VSERVER_VSERVER_NAME)) { - return virtualOnsetEvent.getAai().get(VSERVER_VSERVER_NAME); - } else if (this.onset.getTarget().equalsIgnoreCase(GENERIC_VNF_VNF_ID)) { - return virtualOnsetEvent.getAai().get(GENERIC_VNF_VNF_ID); - } else if (this.onset.getTarget().equalsIgnoreCase(GENERIC_VNF_VNF_NAME)) { - /* - * If the onset is enriched with the vnf-id, we don't need an A&AI response - */ - if (virtualOnsetEvent.getAai().containsKey(GENERIC_VNF_VNF_ID)) { - return virtualOnsetEvent.getAai().get(GENERIC_VNF_VNF_ID); - } - - /* - * If the vnf-name was retrieved from the onset then the vnf-id must be obtained from - * the event - * manager's A&AI GET query - */ - try { - String vnfId = this.aaiCqResponse.getDefaultGenericVnf().getVnfId(); - if (vnfId == null) { - throw new AaiException("No vnf-id found"); - } - return vnfId; - - } catch (AaiException e) { - throw new ControlLoopException(e.getMessage(), e); - } - } - throw new ControlLoopException("Target does not match target type"); - } - - private String getPnfTarget() throws ControlLoopException { - VirtualControlLoopEvent virtualOnsetEvent = (VirtualControlLoopEvent) this.onset; - if (!PNF_NAME.equalsIgnoreCase(onset.getTarget())) { - throw new ControlLoopException( - "Target in the onset event is either null or does not match target key expected in AAI section."); - } - return virtualOnsetEvent.getAai().get(PNF_NAME); - } - - /** - * Start an operation. - * - * @param onset the onset event - * @return the operation request - * @throws ControlLoopException if an error occurs - */ - public Object startOperation(/* VirtualControlLoopEvent */ControlLoopEvent onset) - throws ControlLoopException { - verifyOperatonCanRun(); - - // - // Setup - // - this.policyResult = null; - Operation operation = new Operation(); - operation.attempt = ++this.attempts; - operation.clOperation.setActor(this.policy.getActor()); - operation.clOperation.setOperation(this.policy.getRecipe()); - operation.clOperation.setTarget(this.policy.getTarget().toString()); - operation.clOperation.setSubRequestId(Integer.toString(operation.attempt)); - // - // Now determine which actor we need to construct a request for - // - try { - switch (policy.getActor()) { - case "APPC": - return startAppcOperation(onset, operation); - case "SO": - return startSoOperation(onset, operation); - case "VFC": - return startVfcOperation(onset, operation); - case "SDNR": - return startSdnrOperation(onset, operation); - case "SDNC": - return startSdncOperation(onset, operation); - case "CDS": - return startCdsOperation(onset, operation); - default: - throw new ControlLoopException( - "invalid actor " + policy.getActor() + " on policy"); - } - - } catch (AaiException e) { - throw new ControlLoopException(e.getMessage(), e); - } - } - - private Object startAppcOperation(ControlLoopEvent onset, Operation operation) { - /* - * If the recipe is ModifyConfig, a legacy APPC request is constructed. Otherwise an - * LCMRequest is - * constructed. - */ - this.currentOperation = operation; - if ("ModifyConfig".equalsIgnoreCase(policy.getRecipe())) { - this.operationRequest = - AppcActor.constructRequest((VirtualControlLoopEvent) onset, - operation.clOperation, this.policy, this.targetEntity); - } else { - this.operationRequest = - AppcLcmActor.constructRequest((VirtualControlLoopEvent) onset, - operation.clOperation, this.policy, this.targetEntity); - } - // - // Save the operation - // - - return operationRequest; - } - - private Object startSoOperation(ControlLoopEvent onset, Operation operation) { - SoActor soActorSp = new SoActor(); - this.operationRequest = soActorSp.constructRequestCq((VirtualControlLoopEvent) onset, - operation.clOperation, this.policy, this.aaiCqResponse); - - // Save the operation - this.currentOperation = operation; - - if (this.operationRequest == null) { - this.policyResult = PolicyResult.FAILURE; - } - - return operationRequest; - } - - private Object startVfcOperation(ControlLoopEvent onset, Operation operation) { - this.operationRequest = - VfcActor.constructRequestCq((VirtualControlLoopEvent) onset, - operation.clOperation, this.policy, this.aaiCqResponse); - this.currentOperation = operation; - if (this.operationRequest == null) { - this.policyResult = PolicyResult.FAILURE; - } - return operationRequest; - } - - private Object startSdnrOperation(ControlLoopEvent onset, Operation operation) { - /* - * If the recipe is ModifyConfig or ModifyConfigANR, a SDNR request is constructed. - */ - this.currentOperation = operation; - this.operationRequest = SdnrActor - .constructRequest((VirtualControlLoopEvent) onset, operation.clOperation, this.policy); - // - // Save the operation - // - if (this.operationRequest == null) { - this.policyResult = PolicyResult.FAILURE; - } - - return operationRequest; - } - - private Object startSdncOperation(ControlLoopEvent onset, Operation operation) { - SdncActor provider = new SdncActor(); - this.operationRequest = provider.constructRequest((VirtualControlLoopEvent) onset, - operation.clOperation, this.policy); - this.currentOperation = operation; - if (this.operationRequest == null) { - this.policyResult = PolicyResult.FAILURE; - } - return operationRequest; - } - - private Object startCdsOperation(ControlLoopEvent onset, Operation operation) - throws AaiException { - - CdsActor provider = new CdsActor(); - Optional<ExecutionServiceInput> optionalRequest = - provider.constructRequest((VirtualControlLoopEvent) onset, operation.clOperation, - this.policy, this.buildAaiParams()); - - this.currentOperation = operation; - if (optionalRequest.isPresent()) { - this.operationRequest = optionalRequest.get(); - } else { - this.operationRequest = null; - this.policyResult = PolicyResult.FAILURE; - } - - return this.operationRequest; - } - - /** - * Build AAI parameters for CDS operation. - * - * @return a map containing vnf id key and value for the vnf to apply the action to. - * @throws AaiException if the vnf can not be found. - */ - private Map<String, String> buildAaiParams() throws AaiException { - - Map<String, String> result = new HashMap<>(); - - if (TargetType.VNF.equals(policy.getTarget().getType()) - || TargetType.VFMODULE.equals(policy.getTarget().getType())) { - - ServiceInstance serviceInstance = this.aaiCqResponse.getServiceInstance(); - if (serviceInstance == null) { - logger.info("Target entity service instance could not be found"); - throw new AaiException("Target service instance could not be found"); - } - - GenericVnf genericVnf = this.aaiCqResponse - .getGenericVnfByModelInvariantId(policy.getTarget().getResourceID()); - if (genericVnf == null) { - logger.info("Target entity generic vnf could not be found"); - throw new AaiException("Target generic vnf could not be found"); - } - - result.put(AAI_SERVICE_INSTANCE_ID_KEY, serviceInstance.getServiceInstanceId()); - result.put(GENERIC_VNF_VNF_ID, genericVnf.getVnfId()); - - } else if (TargetType.PNF.equals(policy.getTarget().getType())) { - result = this.eventManager.getPnf((VirtualControlLoopEvent) onset); - } - - return result; - - } - - /** - * Handle a response. - * - * @param response the response - * @return a PolicyResult - */ - public PolicyResult onResponse(Object response) { - // - // Which response is it? - // - if (response instanceof Response) { - // - // Cast APPC response and handle it - // - return onResponse((Response) response); - } else if (response instanceof AppcLcmDmaapWrapper) { - // - // Cast LCM response and handle it - // - return onResponse((AppcLcmDmaapWrapper) response); - } else if (response instanceof PciResponseWrapper) { - // - // Cast SDNR response and handle it - // - return onResponse((PciResponseWrapper) response); - } else if (response instanceof SoResponseWrapper) { - // - // Cast SO response and handle it - // - return onResponse((SoResponseWrapper) response); - } else if (response instanceof VfcResponse) { - // - // Cast VFC response and handle it - // - return onResponse((VfcResponse) response); - } else if (response instanceof SdncResponse) { - // - // Cast SDNC response and handle it - // - return onResponse((SdncResponse) response); - } else if (response instanceof CdsResponse) { - // - // Cast CDS response and handle it - // - return onResponse((CdsResponse) response); - } else { - return null; - } - } - - /** - * This method handles operation responses from APPC. - * - * @param appcResponse the APPC response - * @return The result of the response handling - */ - private PolicyResult onResponse(Response appcResponse) { - // - // Determine which subrequestID (ie. attempt) - // - Integer operationAttempt = getSubRequestId(appcResponse); - if (operationAttempt == null) { - this.completeOperation(operationAttempt, - "Policy was unable to parse APP-C SubRequestID (it was null).", - PolicyResult.FAILURE_EXCEPTION); - return PolicyResult.FAILURE_EXCEPTION; - } - // - // Sanity check the response message - // - if (appcResponse.getStatus() == null) { - // - // We cannot tell what happened if this doesn't exist - // - this.completeOperation(operationAttempt, - "Policy was unable to parse APP-C response status field (it was null).", - PolicyResult.FAILURE_EXCEPTION); - return PolicyResult.FAILURE_EXCEPTION; - } - // - // Get the Response Code - // - ResponseCode code = ResponseCode.toResponseCode(appcResponse.getStatus().getCode()); - if (code == null) { - // - // We are unaware of this code - // - this.completeOperation(operationAttempt, - "Policy was unable to parse APP-C response status code field.", - PolicyResult.FAILURE_EXCEPTION); - return PolicyResult.FAILURE_EXCEPTION; - } - - return onResponse(appcResponse, operationAttempt, code); - } - - private PolicyResult onResponse(Response appcResponse, Integer operationAttempt, - ResponseCode code) { - // - // Ok, let's figure out what APP-C's response is - // - switch (code) { - case ACCEPT: - // - // This is good, they got our original message and - // acknowledged it. - // - // Is there any need to track this? - // - return null; - case ERROR: - case REJECT: - // - // We'll consider these two codes as exceptions - // - this.completeOperation(operationAttempt, appcResponse.getStatus().getDescription(), - PolicyResult.FAILURE_EXCEPTION); - return getTimeoutResult(PolicyResult.FAILURE_EXCEPTION); - case SUCCESS: - // - // - // - this.completeOperation(operationAttempt, appcResponse.getStatus().getDescription(), - PolicyResult.SUCCESS); - return getTimeoutResult(PolicyResult.SUCCESS); - case FAILURE: - // - // - // - this.completeOperation(operationAttempt, appcResponse.getStatus().getDescription(), - PolicyResult.FAILURE); - return getTimeoutResult(PolicyResult.FAILURE); - default: - return null; - } - } - - /** - * This method handles operation responses from LCM. - * - * @param dmaapResponse the LCM response - * @return The result of the response handling - */ - private PolicyResult onResponse(AppcLcmDmaapWrapper dmaapResponse) { - /* - * Parse out the operation attempt using the subrequestid - */ - Integer operationAttempt = AppcLcmActor.parseOperationAttempt( - dmaapResponse.getBody().getOutput().getCommonHeader().getSubRequestId()); - if (operationAttempt == null) { - this.completeOperation(operationAttempt, - "Policy was unable to parse APP-C SubRequestID (it was null).", - PolicyResult.FAILURE_EXCEPTION); - return PolicyResult.FAILURE_EXCEPTION; - } - - /* - * Process the APPCLCM response to see what PolicyResult should be returned - */ - AbstractMap.SimpleEntry<PolicyResult, String> result = - AppcLcmActor.processResponse(dmaapResponse); - - if (result.getKey() != null) { - this.completeOperation(operationAttempt, result.getValue(), result.getKey()); - if (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult)) { - return null; - } - return result.getKey(); - } - return null; - } - - /** - * This method handles operation responses from SDNR. - * - * @param dmaapResponse the SDNR response - * @return the result of the response handling - */ - private PolicyResult onResponse(PciResponseWrapper dmaapResponse) { - /* - * Parse out the operation attempt using the subrequestid - */ - Integer operationAttempt = SdnrActor - .parseOperationAttempt(dmaapResponse.getBody().getCommonHeader().getSubRequestId()); - if (operationAttempt == null) { - this.completeOperation(operationAttempt, - "Policy was unable to parse SDNR SubRequestID.", PolicyResult.FAILURE_EXCEPTION); - return PolicyResult.FAILURE_EXCEPTION; - } - - /* - * Process the SDNR response to see what PolicyResult should be returned - */ - Pair<PolicyResult, String> result = - SdnrActor.processResponse(dmaapResponse); - - if (result.getLeft() != null) { - this.completeOperation(operationAttempt, result.getRight(), result.getLeft()); - if (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult)) { - return null; - } - return result.getLeft(); - } - return null; - } - - /** - * This method handles operation responses from SO. - * - * @param msoResponse the SO response - * @return The result of the response handling - */ - private PolicyResult onResponse(SoResponseWrapper msoResponse) { - switch (msoResponse.getSoResponse().getHttpResponseCode()) { - case 200: - case 202: - // - // Consider it as success - // - this.completeOperation(this.attempts, - msoResponse.getSoResponse().getHttpResponseCode() + SUCCESS_MSG, - PolicyResult.SUCCESS); - return getTimeoutResult(PolicyResult.SUCCESS); - default: - // - // Consider it as failure - // - this.completeOperation(this.attempts, - msoResponse.getSoResponse().getHttpResponseCode() + FAILED_MSG, - PolicyResult.FAILURE); - return getTimeoutResult(PolicyResult.FAILURE); - } - } - - /** - * This method handles operation responses from VFC. - * - * @param vfcResponse the VFC response - * @return The result of the response handling - */ - private PolicyResult onResponse(VfcResponse vfcResponse) { - if ("finished".equalsIgnoreCase(vfcResponse.getResponseDescriptor().getStatus())) { - // - // Consider it as success - // - this.completeOperation(this.attempts, SUCCESS_MSG, PolicyResult.SUCCESS); - return getTimeoutResult(PolicyResult.SUCCESS); - } else { - // - // Consider it as failure - // - this.completeOperation(this.attempts, FAILED_MSG, PolicyResult.FAILURE); - if (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult)) { - return null; - } - // increment operation attempts for retries - this.attempts += 1; - return PolicyResult.FAILURE; - } - } - - /** - * This method handles operation responses from SDNC. - * - * @param sdncResponse the VFC response - * @return The result of the response handling - */ - private PolicyResult onResponse(SdncResponse sdncResponse) { - if ("200".equals(sdncResponse.getResponseOutput().getResponseCode())) { - // - // Consider it as success - // - this.completeOperation(this.attempts, SUCCESS_MSG, PolicyResult.SUCCESS); - return getTimeoutResult(PolicyResult.SUCCESS); - } else { - // - // Consider it as failure - // - this.completeOperation(this.attempts, FAILED_MSG, PolicyResult.FAILURE); - if (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult)) { - return null; - } - // increment operation attempts for retries - this.attempts += 1; - return PolicyResult.FAILURE; - } - } - - /** - * This method handles operation responses from CDS. - * - * @param response the CDS response - * @return The result of the response handling - */ - private PolicyResult onResponse(CdsResponse response) { - if (response != null && CdsActorConstants.SUCCESS.equals(response.getStatus())) { - // - // Consider it as success - // - this.completeOperation(this.attempts, SUCCESS_MSG, PolicyResult.SUCCESS); - return getTimeoutResult(PolicyResult.SUCCESS); - } else { - // - // Consider it as failure - // - this.completeOperation(this.attempts, FAILED_MSG, PolicyResult.FAILURE); - if (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult)) { - return null; - } - // increment operation attempts for retries - this.attempts += 1; - return PolicyResult.FAILURE; - } - } - - private PolicyResult getTimeoutResult(PolicyResult result) { - return (PolicyResult.FAILURE_TIMEOUT.equals(this.policyResult) ? null : result); - } - - private Integer getSubRequestId(Response appcResponse) { - try { - return Integer.valueOf(appcResponse.getCommonHeader().getSubRequestId()); - } catch (NumberFormatException e) { - // - // We cannot tell what happened if this doesn't exist - // - return null; - } - } - - /** - * Get the operation timeout. - * - * @return the timeout - */ - public Integer getOperationTimeout() { - // - // Sanity check - // - if (this.policy == null) { - logger.debug("getOperationTimeout returning 0"); - return 0; - } - logger.debug("getOperationTimeout returning {}", this.policy.getTimeout()); - return this.policy.getTimeout(); - } - - /** - * Get the operation timeout as a String. - * - * @param defaultTimeout the default timeout - * @return the timeout as a String - */ - public String getOperationTimeoutString(int defaultTimeout) { - Integer to = this.getOperationTimeout(); - if (to == null || to == 0) { - return Integer.toString(defaultTimeout) + "s"; - } - return to.toString() + "s"; - } - - public PolicyResult getOperationResult() { - return this.policyResult; - } - - /** - * Get the operation as a message. - * - * @return the operation as a message - */ - public String getOperationMessage() { - if (this.currentOperation != null && this.currentOperation.clOperation != null) { - return this.currentOperation.clOperation.toMessage(); - } - - if (!this.operationHistory.isEmpty()) { - return this.operationHistory.getLast().clOperation.toMessage(); - } - return null; - } - - /** - * Get the operation as a message including the guard result. - * - * @param guardResult the guard result - * @return the operation as a message including the guard result - */ - public String getOperationMessage(String guardResult) { - if (this.currentOperation != null && this.currentOperation.clOperation != null) { - return this.currentOperation.clOperation.toMessage() + ", Guard result: " + guardResult; - } - - if (!this.operationHistory.isEmpty()) { - return this.operationHistory.getLast().clOperation.toMessage() + ", Guard result: " - + guardResult; - } - return null; - } - - /** - * Get the operation history. - * - * @return the operation history - */ - public String getOperationHistory() { - if (this.currentOperation != null && this.currentOperation.clOperation != null) { - return this.currentOperation.clOperation.toHistory(); - } - - if (!this.operationHistory.isEmpty()) { - return this.operationHistory.getLast().clOperation.toHistory(); - } - return null; - } - - /** - * Get the history. - * - * @return the list of control loop operations - */ - public List<ControlLoopOperation> getHistory() { - LinkedList<ControlLoopOperation> history = new LinkedList<>(); - for (Operation op : this.operationHistory) { - history.add(new ControlLoopOperation(op.clOperation)); - - } - return history; - } - - /** - * Set the operation has timed out. - */ - public void setOperationHasTimedOut() { - // - // - // - this.completeOperation(this.attempts, "Operation timed out", PolicyResult.FAILURE_TIMEOUT); - } - - /** - * Set the operation has been denied by guard. - */ - public void setOperationHasGuardDeny() { - // - // - // - this.completeOperation(this.attempts, "Operation denied by Guard", - PolicyResult.FAILURE_GUARD); - } - - public void setOperationHasException(String message) { - this.completeOperation(this.attempts, message, PolicyResult.FAILURE_EXCEPTION); - } - - /** - * Is the operation complete. - * - * @return <code>true</code> if the operation is complete, <code>false</code> otherwise - */ - public boolean isOperationComplete() { - // - // Is there currently a result? - // - if (this.policyResult == null) { - // - // either we are in process or we - // haven't started - // - return false; - } - // - // We have some result, check if the operation failed - // - if (this.policyResult.equals(PolicyResult.FAILURE)) { - // - // Check if there were no retries specified - // - if (getMaxRetries() < 1) { - // - // The result is the failure - // - return true; - } - // - // Check retries - // - if (this.attempts > getMaxRetries()) { - // - // No more attempts allowed, reset - // that our actual result is failure due to retries - // - this.policyResult = PolicyResult.FAILURE_RETRIES; - return true; - } else { - // - // There are more attempts available to try the - // policy recipe. - // - return false; - } - } - // - // Other results mean we are done - // - return true; - } - - public boolean isOperationRunning() { - return (this.currentOperation != null); - } - - /** - * This method verifies that the operation manager may run an operation. - * - * @return True if the operation can run, false otherwise - * @throws ControlLoopException if the operation cannot run - */ - private void verifyOperatonCanRun() throws ControlLoopException { - // - // They shouldn't call us if we currently running something - // - if (this.currentOperation != null) { - // - // what do we do if we are already running an operation? - // - throw new ControlLoopException( - "current operation is not null (an operation is already running)"); - } - // - // Check if we have maxed out on retries - // - if (getMaxRetries() < 1) { - // - // No retries are allowed, so check have we even made - // one attempt to execute the operation? - // - if (this.attempts >= 1) { - // - // We have, let's ensure our PolicyResult is set - // - if (this.policyResult == null) { - this.policyResult = PolicyResult.FAILURE_RETRIES; - } - // - // - // - throw new ControlLoopException( - "current operation failed and retries are not allowed"); - } - } else { - // - // Have we maxed out on retries? - // - if (this.attempts > getMaxRetries()) { - if (this.policyResult == null) { - this.policyResult = PolicyResult.FAILURE_RETRIES; - } - throw new ControlLoopException( - "current oepration has failed after " + this.attempts + " retries"); - } - } - } - - /** - * Gets the maximum number of retries. - * - * @return the maximum number of retries, or {@code 0}, if not specified - */ - public int getMaxRetries() { - return (policy.getRetry() != null ? policy.getRetry() : 0); - } - - private void storeOperationInDataBase() { - // Only store in DB if enabled - boolean guardEnabled = "false".equalsIgnoreCase( - PolicyEngineConstants.getManager().getEnvironmentProperty("guard.disabled")); - if (!guardEnabled) { - return; - } - - // DB Properties - Properties props = new Properties(); - if (PolicyEngineConstants.getManager().getEnvironmentProperty(Util.ONAP_KEY_URL) != null - && PolicyEngineConstants.getManager().getEnvironmentProperty(Util.ONAP_KEY_USER) != null - && PolicyEngineConstants.getManager() - .getEnvironmentProperty(Util.ONAP_KEY_PASS) != null) { - props.put(Util.ECLIPSE_LINK_KEY_URL, - PolicyEngineConstants.getManager().getEnvironmentProperty(Util.ONAP_KEY_URL)); - props.put(Util.ECLIPSE_LINK_KEY_USER, - PolicyEngineConstants.getManager().getEnvironmentProperty(Util.ONAP_KEY_USER)); - props.put(Util.ECLIPSE_LINK_KEY_PASS, - PolicyEngineConstants.getManager().getEnvironmentProperty(Util.ONAP_KEY_PASS)); - props.put(PersistenceUnitProperties.CLASSLOADER, - ControlLoopOperationManager.class.getClassLoader()); - } - - String opsHistPu = System.getProperty("OperationsHistoryPU"); - if (!"OperationsHistoryPUTest".equals(opsHistPu)) { - opsHistPu = "OperationsHistoryPU"; - } else { - props.clear(); - } - EntityManager em; - try { - em = Persistence.createEntityManagerFactory(opsHistPu, props).createEntityManager(); - } catch (Exception e) { - logger.error("storeOperationInDataBase threw: ", e); - return; - } - - OperationsHistory newEntry = new OperationsHistory(); - - newEntry.setClosedLoopName(this.onset.getClosedLoopControlName()); - newEntry.setRequestId(this.onset.getRequestId().toString()); - newEntry.setActor(this.currentOperation.clOperation.getActor()); - newEntry.setOperation(this.currentOperation.clOperation.getOperation()); - newEntry.setTarget(this.targetEntity); - newEntry.setStarttime(Timestamp.from(this.currentOperation.clOperation.getStart())); - newEntry.setSubrequestId(this.currentOperation.clOperation.getSubRequestId()); - newEntry - .setEndtime(new Timestamp(this.currentOperation.clOperation.getEnd().toEpochMilli())); - newEntry.setMessage(this.currentOperation.clOperation.getMessage()); - newEntry.setOutcome(this.currentOperation.clOperation.getOutcome()); - - em.getTransaction().begin(); - em.persist(newEntry); - em.getTransaction().commit(); - - em.close(); - } - - private void completeOperation(Integer attempt, String message, PolicyResult result) { - if (attempt == null) { - logger.debug("attempt cannot be null (i.e. subRequestID)"); - return; - } - if (this.currentOperation != null) { - if (this.currentOperation.attempt == attempt.intValue()) { - this.currentOperation.clOperation.setEnd(Instant.now()); - this.currentOperation.clOperation.setMessage(message); - this.currentOperation.clOperation.setOutcome(result.toString()); - this.currentOperation.policyResult = result; - // - // Save it in history - // - this.operationHistory.add(this.currentOperation); - this.storeOperationInDataBase(); - // - // Set our last result - // - this.policyResult = result; - // - // Clear the current operation field - // - this.currentOperation = null; - return; - } - logger.debug("not current"); - } - for (Operation op : this.operationHistory) { - if (op.attempt == attempt.intValue()) { - op.clOperation.setEnd(Instant.now()); - op.clOperation.setMessage(message); - op.clOperation.setOutcome(result.toString()); - op.policyResult = result; - return; - } - } - logger.debug("Could not find associated operation"); - } - - /** - * Commit the abatement to the history database. - * - * @param message the abatement message - * @param outcome the abatement outcome - */ - public void commitAbatement(String message, String outcome) { - logger.info("commitAbatement: {}. {}", message, outcome); - - if (this.currentOperation == null) { - try { - this.currentOperation = this.operationHistory.getLast(); - } catch (NoSuchElementException e) { - logger.error("{}: commitAbatement threw an exception ", this, e); - return; - } - } - this.currentOperation.clOperation.setEnd(Instant.now()); - this.currentOperation.clOperation.setMessage(message); - this.currentOperation.clOperation.setOutcome(outcome); - // - // Store commit in DB - // - this.storeOperationInDataBase(); - // - // Clear the current operation field - // - this.currentOperation = null; - } - - /** - * Construct a ControlLoopResponse object from actor response and input event. - * - * @param response the response from actor - * @param event the input event - * - * @return a ControlLoopResponse - */ - public ControlLoopResponse getControlLoopResponse(Object response, - VirtualControlLoopEvent event) { - if (response instanceof PciResponseWrapper) { - // - // Cast SDNR response and handle it - // - return SdnrActor.getControlLoopResponse((PciResponseWrapper) response, - event); - } else { - return null; - } - } - -} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockCallbackWorkingMemory.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockCallbackWorkingMemory.java deleted file mode 100644 index 738d3b922..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/LockCallbackWorkingMemory.java +++ /dev/null @@ -1,80 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.controlloop.eventmanager; - -import lombok.Getter; -import org.drools.core.WorkingMemory; -import org.kie.api.runtime.rule.FactHandle; -import org.onap.policy.drools.core.lock.Lock; -import org.onap.policy.drools.core.lock.LockCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Lock call-back that updates working memory. - */ -@Getter -public class LockCallbackWorkingMemory implements LockCallback { - private static final Logger logger = LoggerFactory.getLogger(LockCallbackWorkingMemory.class); - - /** - * Name to be logged when the lock is updated. - */ - private final String name; - - /** - * Working memory to be updated when the lock is notified. - */ - private final WorkingMemory workingMemory; - - - /** - * Constructs the object. - * - * @param name name to be logged when the lock is updated - * @param workingMemory working memory to be updated when the lock is notified - */ - public LockCallbackWorkingMemory(String name, WorkingMemory workingMemory) { - this.name = name; - this.workingMemory = workingMemory; - } - - @Override - public void lockAvailable(Lock lock) { - notifySession(lock); - } - - @Override - public void lockUnavailable(Lock lock) { - notifySession(lock); - } - - /** - * Notifies the session that the lock has been updated. - */ - private void notifySession(Lock lock) { - FactHandle fact = workingMemory.getFactHandle(lock); - if (fact != null) { - logger.debug("{}: updating lock={}", name, lock); - workingMemory.update(fact, lock); - } - } -} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java index 741ce20f8..f2feef666 100644 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ophistory/OperationHistoryDataManagerImpl.java @@ -40,7 +40,6 @@ import org.onap.policy.common.utils.jpa.EntityTransCloser; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.guard.OperationsHistory; -import org.onap.policy.guard.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,9 +321,9 @@ public class OperationHistoryDataManagerImpl implements OperationHistoryDataMana */ private Properties toProperties(OperationHistoryDataManagerParams params) { Properties props = new Properties(); - props.put(Util.ECLIPSE_LINK_KEY_URL, params.getUrl()); - props.put(Util.ECLIPSE_LINK_KEY_USER, params.getUserName()); - props.put(Util.ECLIPSE_LINK_KEY_PASS, params.getPassword()); + props.put(PersistenceUnitProperties.JDBC_URL, params.getUrl()); + props.put(PersistenceUnitProperties.JDBC_USER, params.getUserName()); + props.put(PersistenceUnitProperties.JDBC_PASSWORD, params.getPassword()); props.put(PersistenceUnitProperties.CLASSLOADER, getClass().getClassLoader()); return props; diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java deleted file mode 100644 index 7e4c10749..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java +++ /dev/null @@ -1,26 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy engine - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools; - -@FunctionalInterface -public interface PolicyEngine { - public boolean deliver(String busType, String topic, Object obj); -} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngineListener.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngineListener.java deleted file mode 100644 index 419c31094..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngineListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy engine - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools; - -@FunctionalInterface -public interface PolicyEngineListener { - /** - * Any class that implements this interface will be notified of a new event on the queue in the - * PolicyEngineJUnitImpl. - * - * @param topic a key to the queue that contains the event - */ - public void newEventNotification(String topic); -} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java deleted file mode 100644 index 85fc13c9e..000000000 --- a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java +++ /dev/null @@ -1,147 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * policy engine - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.drools.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import org.onap.policy.appc.Request; -import org.onap.policy.appclcm.AppcLcmDmaapWrapper; -import org.onap.policy.controlloop.ControlLoopNotification; -import org.onap.policy.controlloop.util.Serialization; -import org.onap.policy.drools.PolicyEngine; -import org.onap.policy.drools.PolicyEngineListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PolicyEngineJUnitImpl implements PolicyEngine { - - private static final Logger logger = LoggerFactory.getLogger(PolicyEngineJUnitImpl.class); - private Map<String, Map<String, Queue<Object>>> busMap = new HashMap<>(); - private List<PolicyEngineListener> listeners = new ArrayList<>(); - - /** - * Adds all objects that implement PolicyEngineListener to the notification list when an event - * occurs. - * - * @param listener an object that is interest in knowing about events published to the - * PolicyEngine - */ - public void addListener(PolicyEngineListener listener) { - listeners.add(listener); - } - - /** - * Notifies all listeners about a new event. - * - * @param topic the topic in which the notification was sent to - */ - public void notifyListeners(String topic) { - for (PolicyEngineListener listener : listeners) { - listener.newEventNotification(topic); - } - } - - @Override - public boolean deliver(String busType, String topic, Object obj) { - if (obj instanceof ControlLoopNotification) { - ControlLoopNotification notification = (ControlLoopNotification) obj; - if (logger.isDebugEnabled()) { - logger.debug(Serialization.gsonPretty.toJson(notification)); - } - } - if (obj instanceof Request) { - Request request = (Request) obj; - logger.debug("Request: {} subrequest {}", request.getAction(), request.getCommonHeader().getSubRequestId()); - } else if (obj instanceof AppcLcmDmaapWrapper) { - AppcLcmDmaapWrapper dmaapRequest = (AppcLcmDmaapWrapper) obj; - logger.debug("Request: {} subrequest {}", dmaapRequest.getBody().getInput().getAction(), - dmaapRequest.getBody().getInput().getCommonHeader().getSubRequestId()); - } - // - // Does the bus exist? - // - if (!busMap.containsKey(busType)) { - logger.debug("creating new bus type {}", busType); - // - // Create the bus - // - busMap.put(busType, new HashMap<>()); - } - // - // Get the bus - // - Map<String, Queue<Object>> topicMap = busMap.get(busType); - // - // Does the topic exist? - // - if (!topicMap.containsKey(topic)) { - logger.debug("creating new topic {}", topic); - // - // Create the topic - // - topicMap.put(topic, new LinkedList<>()); - } - // - // Get the topic queue - // - logger.debug("queueing"); - boolean res = topicMap.get(topic).add(obj); - notifyListeners(topic); - return res; - } - - /** - * Subscribe to a topic on a bus. - * - * @param busType the bus type - * @param topic the topic - * @return the head of the queue, or <code>null</code> if the queue or bus does not exist or the - * queue is empty - */ - public Object subscribe(String busType, String topic) { - // - // Does the bus exist? - // - if (busMap.containsKey(busType)) { - // - // Get the bus - // - Map<String, Queue<Object>> topicMap = busMap.get(busType); - // - // Does the topic exist? - // - if (topicMap.containsKey(topic)) { - logger.debug("The queue has {}", topicMap.get(topic).size()); - return topicMap.get(topic).poll(); - } else { - logger.error("No topic exists {}", topic); - } - } else { - logger.error("No bus exists {}", busType); - } - return null; - } - -} |