diff options
Diffstat (limited to 'controlloop/common/eventmanager/src/main/java')
11 files changed, 1630 insertions, 0 deletions
diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopException.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopException.java new file mode 100644 index 000000000..e828150a0 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop; + +public class ControlLoopException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 6400725747325923701L; + + public ControlLoopException() { + super(); + } + + public ControlLoopException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public ControlLoopException(String message, Throwable cause) { + super(message, cause); + } + + public ControlLoopException(String message) { + super(message); + } + + public ControlLoopException(Throwable cause) { + super(cause); + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopLogger.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopLogger.java new file mode 100644 index 000000000..4495f2a71 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopLogger.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop; + +import java.lang.reflect.Constructor; + +public interface ControlLoopLogger { + + public void info(String... parameters); + + public void metrics(String... msgs); + + public void metrics(Object obj); + + public static class Factory { + + public ControlLoopLogger buildLogger(String className) { + try { + Constructor<?> constr = Class.forName(className).getConstructor(); + return (ControlLoopLogger) constr.newInstance(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Cannot load class " + className); + } + } + + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopPublisher.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopPublisher.java new file mode 100644 index 000000000..3ed6f8d58 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/ControlLoopPublisher.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop; + +import java.lang.reflect.Constructor; + +public interface ControlLoopPublisher { + + public void publish(Object object); + + public static class Factory { + + public ControlLoopPublisher buildLogger(String className) { + try { + Constructor<?> constr = Class.forName(className).getConstructor(); + return (ControlLoopPublisher) constr.newInstance(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Cannot load class " + className); + } + } + + } + + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java new file mode 100644 index 000000000..1892746f1 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopEventManager.java @@ -0,0 +1,572 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop event manager + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.UUID; + +import org.onap.policy.controlloop.ControlLoopEventStatus; +import org.onap.policy.controlloop.ControlLoopNotificationType; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; +import org.onap.policy.controlloop.VirtualControlLoopNotification; + +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.policy.FinalResult; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.processor.ControlLoopProcessor; +import org.onap.policy.guard.GuardResult; +import org.onap.policy.guard.LockCallback; +import org.onap.policy.guard.PolicyGuard; +import org.onap.policy.guard.PolicyGuard.LockResult; +import org.onap.policy.guard.TargetLock; + +public class ControlLoopEventManager implements LockCallback, Serializable { + + /** + * + */ + private static final long serialVersionUID = -1216568161322872641L; + public final String closedLoopControlName; + public final UUID requestID; + + private String controlLoopResult; + private ControlLoopProcessor processor = null; + private VirtualControlLoopEvent onset; + private Integer numOnsets = 0; + private Integer numAbatements = 0; + private VirtualControlLoopEvent abatement; + private FinalResult controlLoopTimedOut = null; + + private boolean isActivated = false; + private LinkedList<ControlLoopOperation> controlLoopHistory = new LinkedList<ControlLoopOperation>(); + private ControlLoopOperationManager currentOperation = null; + private TargetLock targetLock = null; + + private static Collection<String> requiredAAIKeys = new ArrayList<String>(); + static { + requiredAAIKeys.add("AICVServerSelfLink"); + requiredAAIKeys.add("AICIdentity"); + requiredAAIKeys.add("is_closed_loop_disabled"); + requiredAAIKeys.add("VM_NAME"); + } + + public ControlLoopEventManager(String closedLoopControlName, UUID requestID) { + this.closedLoopControlName = closedLoopControlName; + this.requestID = requestID; + } + + public String getControlLoopResult() { + return controlLoopResult; + } + + public void setControlLoopResult(String controlLoopResult) { + this.controlLoopResult = controlLoopResult; + } + + public Integer getNumOnsets() { + return numOnsets; + } + + public void setNumOnsets(Integer numOnsets) { + this.numOnsets = numOnsets; + } + + public Integer getNumAbatements() { + return numAbatements; + } + + public void setNumAbatements(Integer numAbatements) { + this.numAbatements = numAbatements; + } + + public boolean isActivated() { + return isActivated; + } + + public void setActivated(boolean isActivated) { + this.isActivated = isActivated; + } + + public VirtualControlLoopEvent getOnsetEvent() { + return this.onset; + } + + public VirtualControlLoopEvent getAbatementEvent() { + return this.abatement; + } + + public ControlLoopProcessor getProcessor() { + return this.processor; + } + + public VirtualControlLoopNotification activate(VirtualControlLoopEvent event) { + VirtualControlLoopNotification notification = new VirtualControlLoopNotification(event); + try { + // + // This method should ONLY be called ONCE + // + if (this.isActivated) { + throw new ControlLoopException("ControlLoopEventManager has already been activated."); + } + // + // Syntax check the event + // + checkEventSyntax(event); + // + // At this point we are good to go with this event + // + this.onset = event; + this.numOnsets = 1; + // + notification.notification = ControlLoopNotificationType.ACTIVE; + // + // Set ourselves as active + // + this.isActivated = true; + } catch (ControlLoopException e) { + notification.notification = ControlLoopNotificationType.REJECTED; + notification.message = e.getMessage(); + } + return notification; + } + + + + public VirtualControlLoopNotification activate(String yamlSpecification, VirtualControlLoopEvent event) { + VirtualControlLoopNotification notification = new VirtualControlLoopNotification(event); + try { + // + // This method should ONLY be called ONCE + // + if (this.isActivated) { + throw new ControlLoopException("ControlLoopEventManager has already been activated."); + } + // + // Syntax check the event + // + checkEventSyntax(event); + + // + // Check the YAML + // + if (yamlSpecification == null || yamlSpecification.length() < 1) { + throw new ControlLoopException("yaml specification is null or 0 length"); + } + String decodedYaml = null; + try { + decodedYaml = URLDecoder.decode(yamlSpecification, "UTF-8"); + if (decodedYaml != null && decodedYaml.length() > 0) { + yamlSpecification = decodedYaml; + } + } catch (UnsupportedEncodingException e) { + } + // + // Parse the YAML specification + // + this.processor = new ControlLoopProcessor(yamlSpecification); + + // + // At this point we are good to go with this event + // + this.onset = event; + this.numOnsets = 1; + // + // + // + notification.notification = ControlLoopNotificationType.ACTIVE; + // + // Set ourselves as active + // + this.isActivated = true; + } catch (ControlLoopException e) { + notification.notification = ControlLoopNotificationType.REJECTED; + notification.message = e.getMessage(); + } + return notification; + } + + public VirtualControlLoopNotification isControlLoopFinal() throws ControlLoopException { + // + // Check if they activated us + // + if (this.isActivated == false) { + throw new ControlLoopException("ControlLoopEventManager MUST be activated first."); + } + // + // Make sure we are expecting this call. + // + if (this.onset == null) { + throw new ControlLoopException("No onset event for ControlLoopEventManager."); + } + // + // Ok, start creating the notification + // + VirtualControlLoopNotification notification = new VirtualControlLoopNotification(this.onset); + // + // Check if the overall control loop has timed out + // + if (this.isControlLoopTimedOut()) { + // + // Yes we have timed out + // + notification.notification = ControlLoopNotificationType.FINAL_FAILURE; + notification.message = "Control Loop timed out"; + notification.history.addAll(this.controlLoopHistory); + return notification; + } + // + // Check if the current policy is Final + // + FinalResult result = this.processor.checkIsCurrentPolicyFinal(); + if (result == null) { + // + // we are not at a final result + // + return null; + } + + switch (result) { + case FINAL_FAILURE: + case FINAL_FAILURE_EXCEPTION: + case FINAL_FAILURE_RETRIES: + case FINAL_FAILURE_TIMEOUT: + case FINAL_FAILURE_GUARD: + notification.notification = ControlLoopNotificationType.FINAL_FAILURE; + break; + case FINAL_OPENLOOP: + notification.notification = ControlLoopNotificationType.FINAL_OPENLOOP; + break; + case FINAL_SUCCESS: + notification.notification = ControlLoopNotificationType.FINAL_SUCCESS; + break; + default: + return null; + } + // + // Be sure to add all the history + // + notification.history.addAll(this.controlLoopHistory); + return notification; + } + + public ControlLoopOperationManager processControlLoop() throws ControlLoopException { + // + // Check if they activated us + // + if (this.isActivated == false) { + throw new ControlLoopException("ControlLoopEventManager MUST be activated first."); + } + // + // Make sure we are expecting this call. + // + if (this.onset == null) { + throw new ControlLoopException("No onset event for ControlLoopEventManager."); + } + // + // Is there a current operation? + // + if (this.currentOperation != null) { + // + // Throw an exception, or simply return the current operation? + // + throw new ControlLoopException("Already working an Operation, do not call this method."); + } + // + // Ensure we are not FINAL + // + VirtualControlLoopNotification notification = this.isControlLoopFinal(); + if (notification != null) { + // + // This is weird, we require them to call the isControlLoopFinal() method first + // + // We should really abstract this and avoid throwing an exception, because it really + // isn't an exception. + // + throw new ControlLoopException("Control Loop is in FINAL state, do not call this method."); + } + // + // Not final so get the policy that needs to be worked on. + // + Policy policy = this.processor.getCurrentPolicy(); + if (policy == null) { + throw new ControlLoopException("ControlLoopEventManager: processor came upon null Policy."); + } + // + // And setup an operation + // + this.currentOperation = new ControlLoopOperationManager(this.onset, policy, this); + // + // Return it + // + return this.currentOperation; + } + + public void finishOperation(ControlLoopOperationManager operation) throws ControlLoopException { + // + // Verify we have a current operation + // + if (this.currentOperation != null) { + // + // Validate they are finishing the current operation + // PLD - this is simply comparing the policy. Do we want to equals the whole object? + // + if (this.currentOperation.policy.equals(operation.policy)) { + System.out.println("Finishing " + this.currentOperation.policy.recipe + " result is " + this.currentOperation.getOperationResult()); + // + // Save history + // + this.controlLoopHistory.addAll(this.currentOperation.getHistory()); + // + // Move to the next Policy + // + this.processor.nextPolicyForResult(this.currentOperation.getOperationResult()); + // + // Just null this out + // + this.currentOperation = null; + // + // TODO: Release our lock + // + return; + } + System.out.println("Cannot finish current operation " + this.currentOperation.policy + " does not match given operation " + operation.policy); + return; + } + throw new ControlLoopException("No operation to finish."); + } + + public synchronized LockResult<GuardResult, TargetLock> lockCurrentOperation() throws ControlLoopException { + // + // Sanity check + // + if (this.currentOperation == null) { + throw new ControlLoopException("Do not have a current operation."); + } + // + // Have we acquired it already? + // + if (this.targetLock != null) { + // + // TODO: Make sure the current lock is for the same target. + // Currently, it should be. But in the future it may not. + // + return new LockResult<GuardResult, TargetLock>(GuardResult.LOCK_ACQUIRED, this.targetLock); + } else { + // + // Ask the Guard + // + LockResult<GuardResult, TargetLock> lockResult = PolicyGuard.lockTarget( + this.currentOperation.policy.target.type, + this.getTargetInstance(this.currentOperation.policy), + this.onset.requestID, + this); + // + // Was it acquired? + // + if (lockResult.getA().equals(GuardResult.LOCK_ACQUIRED)) { + // + // Yes, let's save it + // + this.targetLock = lockResult.getB(); + } + return lockResult; + } + } + + public synchronized TargetLock unlockCurrentOperation() { + if (this.targetLock == null) { + return null; + } + if (PolicyGuard.unlockTarget(this.targetLock) == true) { + TargetLock returnLock = this.targetLock; + this.targetLock = null; + return returnLock; + } + return null; + } + + public enum NEW_EVENT_STATUS { + FIRST_ONSET, + SUBSEQUENT_ONSET, + FIRST_ABATEMENT, + SUBSEQUENT_ABATEMENT, + SYNTAX_ERROR + ; + } + + public NEW_EVENT_STATUS onNewEvent(VirtualControlLoopEvent event) { + try { + ControlLoopEventManager.checkEventSyntax(event); + if (event.closedLoopEventStatus == ControlLoopEventStatus.ONSET) { + // + // Check if this is our original ONSET + // + if (event.equals(this.onset)) { + // + // DO NOT retract it + // + return NEW_EVENT_STATUS.FIRST_ONSET; + } + // + // Log that we got an onset + // + this.numOnsets++; + return NEW_EVENT_STATUS.SUBSEQUENT_ONSET; + } else if (event.closedLoopEventStatus == ControlLoopEventStatus.ABATED) { + // + // Have we already got an abatement? + // + if (this.abatement == null) { + // + // Save this + // + this.abatement = event; + // + // Keep track that we received another + // + this.numAbatements++; + // + // + // + return NEW_EVENT_STATUS.FIRST_ABATEMENT; + } else { + // + // Keep track that we received another + // + this.numAbatements++; + // + // + // + return NEW_EVENT_STATUS.SUBSEQUENT_ABATEMENT; + } + } else { + return NEW_EVENT_STATUS.SYNTAX_ERROR; + } + } catch (ControlLoopException e) { + return NEW_EVENT_STATUS.SYNTAX_ERROR; + } + } + + public VirtualControlLoopNotification setControlLoopTimedOut() { + this.controlLoopTimedOut = FinalResult.FINAL_FAILURE_TIMEOUT; + VirtualControlLoopNotification notification = new VirtualControlLoopNotification(this.onset); + notification.notification = ControlLoopNotificationType.FINAL_FAILURE; + notification.message = "Control Loop timed out"; + notification.history.addAll(this.controlLoopHistory); + return notification; + } + + public boolean isControlLoopTimedOut() { + return (this.controlLoopTimedOut == FinalResult.FINAL_FAILURE_TIMEOUT); + } + + public int getControlLoopTimeout(Integer defaultTimeout) { + if (this.processor != null && this.processor.getControlLoop() != null) { + return this.processor.getControlLoop().timeout; + } + if (defaultTimeout != null) { + return defaultTimeout; + } + return 0; + } + + public static void checkEventSyntax(VirtualControlLoopEvent event) throws ControlLoopException { + if (event.closedLoopEventStatus == null || + (event.closedLoopEventStatus != ControlLoopEventStatus.ONSET && + event.closedLoopEventStatus != ControlLoopEventStatus.ABATED)) { + throw new ControlLoopException("Invalid value in closedLoopEventStatus"); + } + if (event.closedLoopControlName == null || event.closedLoopControlName.length() < 1) { + throw new ControlLoopException("No control loop name"); + } + if (event.requestID == null) { + throw new ControlLoopException("No request ID"); + } + if (event.AAI == null) { + throw new ControlLoopException("AAI is null"); + } + if (event.AAI.get("vserver.is-closed-loop-disabled") == null) { + throw new ControlLoopException("vserver.is-closed-loop-disabled information missing"); + } + if (event.AAI.get("vserver.is-closed-loop-disabled").equalsIgnoreCase("true") || + event.AAI.get("vserver.is-closed-loop-disabled").equalsIgnoreCase("T") || + event.AAI.get("vserver.is-closed-loop-disabled").equalsIgnoreCase("yes") || + event.AAI.get("vserver.is-closed-loop-disabled").equalsIgnoreCase("Y")) { + throw new ControlLoopException("vserver.is-closed-loop-disabled is set to true"); + } + if (event.target == null || event.target.length() < 1) { + throw new ControlLoopException("No target field"); + } else { + if (! event.target.equalsIgnoreCase("VM_NAME") && + ! event.target.equalsIgnoreCase("VNF_NAME") && + ! event.target.equalsIgnoreCase("vserver.vserver-name") && + ! event.target.equalsIgnoreCase("generic-vnf.vnf-name") ) { + throw new ControlLoopException("target field invalid - expecting VM_NAME or VNF_NAME"); + } + } + } + + @Override + public boolean isActive() { + // TODO + return true; + } + + @Override + public boolean releaseLock() { + // TODO + return false; + } + + public String getTargetInstance(Policy policy) { + if (policy.target != null) { + if (policy.target.type != null) { + switch(policy.target.type) { + case PNF: + break; + case VM: + if (this.onset.target.equalsIgnoreCase("vserver.vserver-name")) { + return this.onset.AAI.get("vserver.vserver-name"); + } + break; + default: + break; + } + } + } + return null; + } + + @Override + public String toString() { + return "ControlLoopEventManager [closedLoopControlName=" + closedLoopControlName + ", requestID=" + requestID + + ", processor=" + processor + ", onset=" + (onset != null ? onset.requestID : "null") + ", numOnsets=" + numOnsets + ", numAbatements=" + + numAbatements + ", isActivated=" + + isActivated + ", currentOperation=" + currentOperation + ", targetLock=" + targetLock + "]"; + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java new file mode 100644 index 000000000..81c85b1e8 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager.java @@ -0,0 +1,520 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop operation manager + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.LinkedList; + +import javax.persistence.EntityManager; +import javax.persistence.Persistence; + +import org.onap.policy.appc.Response; +import org.onap.policy.appc.ResponseCode; + +import org.onap.policy.controlloop.ControlLoopEvent; +import org.onap.policy.controlloop.ControlLoopOperation; +import org.onap.policy.controlloop.VirtualControlLoopEvent; + +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.onap.policy.controlloop.actor.appc.APPCActorServiceProvider; + + +public class ControlLoopOperationManager implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -3773199283624595410L; + + @Override + public String toString() { + return "ControlLoopOperationManager [onset=" + (onset != null ? onset.requestID : "null") + ", policy=" + + (policy != null ? policy.id : "null") + ", attempts=" + attempts + + ", policyResult=" + policyResult + + ", currentOperation=" + currentOperation + ", operationHistory=" + operationHistory + + "]"; + } + + // + // These properties are not changeable, but accessible + // for Drools Rule statements. + // + //public final ATTControlLoopEvent onset; + public final ControlLoopEvent onset; + public final Policy policy; + + // + // Properties used to track the Operation + // + private int attempts = 0; + private Operation currentOperation = null; + private LinkedList<Operation> operationHistory = new LinkedList<Operation>(); + private PolicyResult policyResult = null; + private ControlLoopEventManager eventManager = null; + + public ControlLoopEventManager getEventManager() { + return eventManager; + } + + public void setEventManager(ControlLoopEventManager eventManager) { + this.eventManager = eventManager; + } + + + // + // Internal class used for tracking + // + private class Operation { + public ControlLoopOperation operation = new ControlLoopOperation(); + public PolicyResult policyResult = null; + public int attempt = 0; + + @Override + public String toString() { + return "Operation [attempt=" + attempt + ", policyResult=" + policyResult + ", operation=" + operation + + "]"; + } + } + + private String guardApprovalStatus = "NONE";//"NONE", "PERMIT", "DENY" + private Object operationRequest; + + public Object getOperationRequest() { + return operationRequest; + } + + public String getGuardApprovalStatus() { + return guardApprovalStatus; + } + public void setGuardApprovalStatus(String guardApprovalStatus) { + this.guardApprovalStatus = guardApprovalStatus; + } + + + public ControlLoopOperationManager(/*ATTControlLoopEvent*/ControlLoopEvent onset, Policy policy, ControlLoopEventManager em) throws ControlLoopException { + this.onset = onset; + this.policy = policy; + this.guardApprovalStatus = "NONE"; + this.eventManager = em; + + // + // Let's make a sanity check + // + switch (policy.actor) { + case "APPC": + break; + case "AOTS": + break; + case "MSO": + break; + case "SDNO": + break; + case "SDNR": + break; + default: + throw new ControlLoopException("ControlLoopEventManager: policy has an unknown actor."); + } + } + + public Object startOperation(/*VirtualControlLoopEvent*/ControlLoopEvent onset) { + // + // They shouldn't call us if we currently running something + // + if (this.currentOperation != null) { + // + // what do we do if we are already running an operation? + // + return null; + } + // + // Check if we have maxed out on retries + // + if (this.policy.retry == null || this.policy.retry < 1) { + // + // No retries are allowed, so check have we even made + // one attempt to execute the operation? + // + if (this.attempts >= 1) { + // + // We have, let's ensure our PolicyResult is set + // + if (this.policyResult == null) { + this.policyResult = PolicyResult.FAILURE_RETRIES; + } + // + // + // + return null; + } + } else { + // + // Have we maxed out on retries? + // + if (this.attempts > this.policy.retry) { + if (this.policyResult == null) { + this.policyResult = PolicyResult.FAILURE_RETRIES; + } + return null; + } + } + // + // Setup + // + this.policyResult = null; + Operation operation = new Operation(); + operation.attempt = ++this.attempts; + operation.operation.actor = this.policy.actor.toString(); + operation.operation.operation = this.policy.recipe; + operation.operation.target = this.policy.target.toString(); + operation.operation.subRequestId = Integer.toString(operation.attempt); + // + // Now determine which actor we need to construct a request for + // + switch (policy.actor) { + case "APPC": + //Request request = APPCActorServiceProvider.constructRequest(onset, operation.operation, this.policy); + this.operationRequest = APPCActorServiceProvider.constructRequest((VirtualControlLoopEvent)onset, operation.operation, this.policy); + // + // Save the operation + // + this.currentOperation = operation; + //System.out.print("************* BEFORE STORING....."); + //this.storeOperationInDataBase("startOperation"); + //System.out.print("************* AFTER STORING....."); + // + return operationRequest; + case "MSO": + // + // We are not supporting MSO interface at the moment + // + System.out.println("We are not supporting MSO actor in the latest release."); + return null; + } + return null; + } + + public PolicyResult onResponse(Object response) { + // + // Which response is it? + // + if (response instanceof Response) { + // + // Cast it + // + Response appcResponse = (Response) response; + // + // Determine which subrequestID (ie. attempt) + // + Integer operationAttempt = null; + try { + operationAttempt = Integer.parseInt(appcResponse.CommonHeader.SubRequestID); + } catch (NumberFormatException e) { + // + // We cannot tell what happened if this doesn't exist + // + this.completeOperation(operationAttempt, "Policy was unable to parse APP-C SubRequestID (it was null).", PolicyResult.FAILURE_EXCEPTION); + return PolicyResult.FAILURE_EXCEPTION; + } + // + // Sanity check the response message + // + if (appcResponse.Status == null) { + // + // We cannot tell what happened if this doesn't exist + // + this.completeOperation(operationAttempt, "Policy was unable to parse APP-C response status field (it was null).", PolicyResult.FAILURE_EXCEPTION); + return PolicyResult.FAILURE_EXCEPTION; + } + // + // Get the Response Code + // + ResponseCode code = ResponseCode.toResponseCode(appcResponse.Status.Code); + if (code == null) { + // + // We are unaware of this code + // + this.completeOperation(operationAttempt, "Policy was unable to parse APP-C response status code field.", PolicyResult.FAILURE_EXCEPTION); + return PolicyResult.FAILURE_EXCEPTION; + } + // + // Ok, let's figure out what APP-C's response is + // + switch (code) { + case ACCEPT: + // + // This is good, they got our original message and + // acknowledged it. + // + // Is there any need to track this? + // + return null; + case ERROR: + case REJECT: + // + // We'll consider these two codes as exceptions + // + this.completeOperation(operationAttempt, appcResponse.getStatus().Description, PolicyResult.FAILURE_EXCEPTION); + if (this.policyResult != null && this.policyResult.equals(PolicyResult.FAILURE_TIMEOUT)) { + return null; + } + return PolicyResult.FAILURE_EXCEPTION; + case SUCCESS: + // + // + // + this.completeOperation(operationAttempt, appcResponse.getStatus().Description, PolicyResult.SUCCESS); + if (this.policyResult != null && this.policyResult.equals(PolicyResult.FAILURE_TIMEOUT)) { + return null; + } + return PolicyResult.SUCCESS; + case FAILURE: + // + // + // + this.completeOperation(operationAttempt, appcResponse.getStatus().Description, PolicyResult.FAILURE); + if (this.policyResult != null && this.policyResult.equals(PolicyResult.FAILURE_TIMEOUT)) { + return null; + } + return PolicyResult.FAILURE; + } + } + return null; + } + + public Integer getOperationTimeout() { + // + // Sanity check + // + if (this.policy == null) { + System.out.println("getOperationTimeout returning 0"); + return 0; + } + System.out.println("getOperationTimeout returning " + this.policy.timeout); + return this.policy.timeout; + } + + public String getOperationTimeoutString(int defaultTimeout) { + Integer to = this.getOperationTimeout(); + if (to == null || to == 0) { + return Integer.toString(defaultTimeout) + "s"; + } + return to.toString() + "s"; + } + + public PolicyResult getOperationResult() { + return this.policyResult; + } + + public String getOperationMessage() { + if (this.currentOperation != null && this.currentOperation.operation != null) { + return this.currentOperation.operation.toMessage(); + } + if (this.operationHistory != null && this.operationHistory.size() > 0) { + return this.operationHistory.getLast().operation.toMessage(); + } + return null; + } + + public String getOperationMessage(String guardResult) { + if (this.currentOperation != null && this.currentOperation.operation != null) { + return this.currentOperation.operation.toMessage()+ ", Guard result: " + guardResult; + } + if (this.operationHistory != null && this.operationHistory.size() > 0) { + return this.operationHistory.getLast().operation.toMessage() + ", Guard result: " + guardResult; + } + return null; + } + + public String getOperationHistory() { + if (this.currentOperation != null && this.currentOperation.operation != null) { + return this.currentOperation.operation.toHistory(); + } + if (this.operationHistory != null && this.operationHistory.size() > 0) { + return this.operationHistory.getLast().operation.toHistory(); + } + return null; + } + + public LinkedList<ControlLoopOperation> getHistory() { + LinkedList<ControlLoopOperation> history = new LinkedList<ControlLoopOperation>(); + for (Operation op : this.operationHistory) { + history.add(new ControlLoopOperation(op.operation)); + + } + return history; + } + + public void setOperationHasTimedOut() { + // + // + // + this.completeOperation(this.attempts, "Operation timed out", PolicyResult.FAILURE_TIMEOUT); + } + + public void setOperationHasGuardDeny() { + // + // + // + this.completeOperation(this.attempts, "Operation denied by Guard", PolicyResult.FAILURE_GUARD); + } + + public boolean isOperationComplete() { + // + // Is there currently a result? + // + if (this.policyResult == null) { + // + // either we are in process or we + // haven't started + // + return false; + } + // + // We have some result, check if the operation failed + // + if (this.policyResult.equals(PolicyResult.FAILURE)) { + // + // Check if there were no retries specified + // + if (policy.retry == null || policy.retry == 0) { + // + // The result is the failure + // + return true; + } + // + // Check retries + // + if (this.isRetriesMaxedOut()) { + // + // No more attempts allowed, reset + // that our actual result is failure due to retries + // + this.policyResult = PolicyResult.FAILURE_RETRIES; + return true; + } else { + // + // There are more attempts available to try the + // policy recipe. + // + return false; + } + } + // + // Other results mean we are done + // + return true; + } + + public boolean isOperationRunning() { + return (this.currentOperation != null); + } + + private boolean isRetriesMaxedOut() { + if (policy.retry == null || policy.retry == 0) { + // + // There were NO retries specified, so declare + // this as completed. + // + return (this.attempts > 0); + } + return (this.attempts > policy.retry); + } + + private void storeOperationInDataBase(){ + + EntityManager em; + try{ + em = Persistence.createEntityManagerFactory("OperationsHistoryPU").createEntityManager();//emf.createEntityManager(); + }catch(Exception e){ + System.err.println("Test thread got Exception " + e.getLocalizedMessage() + " Can't write to Operations History DB."); + return; + } + + OperationsHistoryDbEntry newEntry = new OperationsHistoryDbEntry(); + + newEntry.closedLoopName = this.onset.closedLoopControlName; + newEntry.requestId = this.onset.requestID.toString(); + newEntry.actor = this.currentOperation.operation.actor; + newEntry.operation = this.currentOperation.operation.operation; + newEntry.target = this.eventManager.getTargetInstance(this.policy); + newEntry.starttime = Timestamp.from(this.currentOperation.operation.start); + newEntry.subrequestId = this.currentOperation.operation.subRequestId; + newEntry.endtime = new Timestamp(this.currentOperation.operation.end.toEpochMilli()); + newEntry.message = this.currentOperation.operation.message; + newEntry.outcome = this.currentOperation.operation.outcome; + + em.getTransaction().begin(); + em.persist(newEntry); + em.getTransaction().commit(); + + em.close(); + + } + + + + private void completeOperation(Integer attempt, String message, PolicyResult result) { + if (attempt == null) { + System.out.println("attempt cannot be null (i.e. subRequestID)"); + return; + } + if (this.currentOperation != null) { + if (this.currentOperation.attempt == attempt.intValue()) { + this.currentOperation.operation.end = Instant.now(); + this.currentOperation.operation.message = message; + this.currentOperation.operation.outcome = result.toString(); + this.currentOperation.policyResult = result; + // + // Save it in history + // + this.operationHistory.add(this.currentOperation); + this.storeOperationInDataBase(); + // + // Set our last result + // + this.policyResult = result; + // + // Clear the current operation field + // + this.currentOperation = null; + return; + } + System.out.println("not current"); + } + for (Operation op : this.operationHistory) { + if (op.attempt == attempt.intValue()) { + op.operation.end = Instant.now(); + op.operation.message = message; + op.operation.outcome = result.toString(); + op.policyResult = result; + return; + } + } + System.out.println("Could not find associated operation"); + + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/OperationsHistoryDbEntry.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/OperationsHistoryDbEntry.java new file mode 100644 index 000000000..82775053e --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/OperationsHistoryDbEntry.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.eventmanager; + +import java.io.Serializable; +import java.sql.Timestamp; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Table; + + +@Entity +@Table(name="operationshistory10") +public class OperationsHistoryDbEntry implements Serializable{ + + /** + * + */ + private static final long serialVersionUID = 1L; + + @Id@GeneratedValue + @Column(name="ROWID") + public long rowid; + + @Column(name="CLNAME") + public String closedLoopName; + + public String requestId; + + public String actor; + + public String operation; + + public String target; + + public Timestamp starttime; + + public Timestamp endtime; + + public String subrequestId; + + public String outcome; + + public String message; + +} + + diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopLoggerStdOutImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopLoggerStdOutImpl.java new file mode 100644 index 000000000..acf7f1d22 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopLoggerStdOutImpl.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.policy.controlloop.impl; + +import org.onap.policy.controlloop.ControlLoopLogger; + +public class ControlLoopLoggerStdOutImpl implements ControlLoopLogger { + + public ControlLoopLoggerStdOutImpl() { + } + + @Override + public void info(String... parameters) { + StringBuilder builder = new StringBuilder(); + for (String param : parameters) { + builder.append(param); + builder.append(" " ); + } + System.out.println(builder.toString().trim()); + } + + @Override + public void metrics(String... msgs) { + this.info(msgs); + } + + @Override + public void metrics(Object obj) { + this.info(obj.toString()); + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopPublisherJUnitImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopPublisherJUnitImpl.java new file mode 100644 index 000000000..37721a9a1 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/impl/ControlLoopPublisherJUnitImpl.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.impl; + +import org.onap.policy.controlloop.ControlLoopPublisher; + +public class ControlLoopPublisherJUnitImpl implements ControlLoopPublisher { + + public ControlLoopPublisherJUnitImpl() { + + } + + @Override + public void publish(Object object) { + + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java new file mode 100644 index 000000000..bc94068ab --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/processor/ControlLoopProcessor.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * controlloop processor + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.processor; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.Constructor; + +import org.onap.policy.controlloop.ControlLoopException; +import org.onap.policy.controlloop.policy.ControlLoop; +import org.onap.policy.controlloop.policy.ControlLoopPolicy; +import org.onap.policy.controlloop.policy.FinalResult; +import org.onap.policy.controlloop.policy.Policy; +import org.onap.policy.controlloop.policy.PolicyResult; + +public class ControlLoopProcessor { + + private final String yaml; + private final ControlLoopPolicy policy; + private String currentPolicy = null; + + public ControlLoopProcessor(String yaml) throws ControlLoopException { + this.yaml = yaml; + try { + Yaml y = new Yaml(new Constructor(ControlLoopPolicy.class)); + Object obj = y.load(this.yaml); + if (obj instanceof ControlLoopPolicy) { + this.policy = (ControlLoopPolicy) obj; + this.currentPolicy = this.policy.controlLoop.trigger_policy; + } else { + this.policy = null; + throw new ControlLoopException("Unable to parse yaml into ControlLoopPolicy object"); + } + } catch (Exception e) { + // + // Most likely this is a YAML Exception + // + throw new ControlLoopException(e); + } + } + + public ControlLoop getControlLoop() { + return this.policy.controlLoop; + } + + public FinalResult checkIsCurrentPolicyFinal() { + return FinalResult.toResult(this.currentPolicy); + } + + public Policy getCurrentPolicy() { + for (Policy policy : this.policy.policies) { + if (policy.id.equals(this.currentPolicy)) { + return policy; + } + } + return null; + } + + public void nextPolicyForResult(PolicyResult result) throws ControlLoopException { + Policy policy = this.getCurrentPolicy(); + try { + if (this.policy == null) { + throw new ControlLoopException("There is no current policy to determine where to go to."); + } + switch (result) { + case SUCCESS: + this.currentPolicy = policy.success; + break; + case FAILURE: + this.currentPolicy = policy.failure; + break; + case FAILURE_TIMEOUT: + this.currentPolicy = policy.failure_timeout; + break; + case FAILURE_RETRIES: + this.currentPolicy = policy.failure_retries; + break; + case FAILURE_EXCEPTION: + this.currentPolicy = policy.failure_exception; + break; + case FAILURE_GUARD: + this.currentPolicy = policy.failure_guard; + break; + default: + throw new ControlLoopException("Bad policy result given: " + result); + } + } catch (ControlLoopException e) { + this.currentPolicy = FinalResult.FINAL_FAILURE_EXCEPTION.toString(); + throw e; + } + } + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java new file mode 100644 index 000000000..07a273cc7 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/PolicyEngine.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * policy engine + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.drools; + +public interface PolicyEngine { + + public boolean deliver(String busType, String topic, Object obj); + +} diff --git a/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java new file mode 100644 index 000000000..5c019c487 --- /dev/null +++ b/controlloop/common/eventmanager/src/main/java/org/onap/policy/drools/impl/PolicyEngineJUnitImpl.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * policy engine + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.impl; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.onap.policy.appc.Request; +import org.onap.policy.controlloop.ControlLoopNotification; +import org.onap.policy.controlloop.util.Serialization; + +import org.onap.policy.drools.PolicyEngine; + +public class PolicyEngineJUnitImpl implements PolicyEngine { + + private Map<String, Map<String, Queue<Object>>> busMap = new HashMap<String, Map<String, Queue<Object>>>(); + + @Override + public boolean deliver(String busType, String topic, Object obj) { + if (obj instanceof ControlLoopNotification) { + ControlLoopNotification notification = (ControlLoopNotification) obj; + //System.out.println("Notification: " + notification.notification + " " + (notification.message == null ? "" : notification.message) + " " + notification.history); + System.out.println(Serialization.gsonPretty.toJson(notification)); + } + if (obj instanceof Request) { + Request request = (Request) obj; + System.out.println("Request: " + request.Action + " subrequest " + request.CommonHeader.SubRequestID); + } + // + // Does the bus exist? + // + if (busMap.containsKey(busType) == false) { + System.out.println("creating new bus type " + busType); + // + // Create the bus + // + busMap.put(busType, new HashMap<String, Queue<Object>>()); + } + // + // Get the bus + // + Map<String, Queue<Object>> topicMap = busMap.get(busType); + // + // Does the topic exist? + // + if (topicMap.containsKey(topic) == false) { + System.out.println("creating new topic " + topic); + // + // Create the topic + // + topicMap.put(topic, new LinkedList<Object>()); + } + // + // Get the topic queue + // + System.out.println("queueing"); + return topicMap.get(topic).add(obj); + } + + public Object subscribe(String busType, String topic) { + // + // Does the bus exist? + // + if (busMap.containsKey(busType)) { + // + // Get the bus + // + Map<String, Queue<Object>> topicMap = busMap.get(busType); + // + // Does the topic exist? + // + if (topicMap.containsKey(topic)) { + System.out.println("The queue has " + topicMap.get(topic).size()); + return topicMap.get(topic).poll(); + } else { + System.err.println("No topic exists " + topic); + } + } else { + System.err.println("No bus exists " + busType); + } + return null; + } + +} |