diff options
Diffstat (limited to 'appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java')
-rw-r--r-- | appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java | 381 |
1 files changed, 187 insertions, 194 deletions
diff --git a/appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java b/appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java index 1bbf8d0d1..34d40dd55 100644 --- a/appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java +++ b/appc-config/appc-flow-controller/provider/src/main/java/org/onap/appc/flow/controller/node/FlowControlNode.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP : APPC * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ============================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,212 +95,205 @@ import org.onap.ccsdk.sli.core.sli.SvcLogicJavaPlugin; public class FlowControlNode implements SvcLogicJavaPlugin { - private static final EELFLogger log = EELFManager.getInstance().getLogger(FlowControlNode.class); - - private final FlowControlDBService dbService; - private final FlowSequenceGenerator flowSequenceGenerator; - - public FlowControlNode() { - this.dbService = FlowControlDBService.initialise(); - this.flowSequenceGenerator = new FlowSequenceGenerator(); - } - - FlowControlNode(FlowControlDBService dbService, FlowSequenceGenerator flowSequenceGenerator) { - this.dbService = dbService; - this.flowSequenceGenerator = flowSequenceGenerator; - } - - public void processFlow(Map<String, String> inParams, SvcLogicContext ctx) - throws SvcLogicException { - log.debug("Received processParamKeys call with params : " + inParams); - String responsePrefix = inParams.get(INPUT_PARAM_RESPONSE_PREFIX); - try { - responsePrefix = StringUtils.isNotBlank(responsePrefix) ? (responsePrefix + ".") : ""; - SvcLogicContext localContext = new SvcLogicContext(); - - localContext.setAttribute(REQUEST_ID, ctx.getAttribute(REQUEST_ID)); - localContext.setAttribute(VNF_TYPE, ctx.getAttribute(VNF_TYPE)); - localContext.setAttribute(REQUEST_ACTION, ctx.getAttribute(REQUEST_ACTION)); - localContext.setAttribute(ACTION_LEVEL, ctx.getAttribute(ACTION_LEVEL)); - localContext.setAttribute(RESPONSE_PREFIX, responsePrefix); - ctx.setAttribute(RESPONSE_PREFIX, responsePrefix); - - dbService.getFlowReferenceData(ctx, inParams, localContext); - - for (String key : localContext.getAttributeKeySet()) { - log.debug("processFlow " + key + "=" + ctx.getAttribute(key)); - } - processFlowSequence(inParams, ctx, localContext); - if (!ctx.getAttribute(responsePrefix + OUTPUT_PARAM_STATUS).equals(OUTPUT_STATUS_SUCCESS)) { - throw new SvcLogicException(ctx.getAttribute(responsePrefix + OUTPUT_STATUS_MESSAGE)); - } - } catch (Exception e) { - ctx.setAttribute(responsePrefix + OUTPUT_PARAM_STATUS, OUTPUT_STATUS_FAILURE); - ctx.setAttribute(responsePrefix + OUTPUT_PARAM_ERROR_MESSAGE, e.getMessage()); - log.error("Error occurred in processFlow ", e); - throw new SvcLogicException(e.getMessage()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(FlowControlNode.class); + + private final FlowControlDBService dbService; + private final FlowSequenceGenerator flowSequenceGenerator; + + public FlowControlNode() { + this.dbService = FlowControlDBService.initialise(); + this.flowSequenceGenerator = new FlowSequenceGenerator(); } - } - - private void processFlowSequence(Map<String, String> inParams, SvcLogicContext ctx, SvcLogicContext localContext) - throws Exception { - - String fn = "FlowExecutorNode.processflowSequence"; - log.debug(fn + "Received model for flow : " + localContext.toString()); - - localContext - .getAttributeKeySet() - .forEach(key -> log.debug(key + "=" + ctx.getAttribute(key))); - - String flowSequence = flowSequenceGenerator.getFlowSequence(inParams, ctx, localContext); - - log.debug("Received Flow Sequence : " + flowSequence); - HashMap<Integer, Transaction> transactionMap = createTransactionMap(flowSequence, localContext); - executeAllTransaction(transactionMap, ctx); - log.info("Executed all the transaction successfully"); - } - - private void executeAllTransaction(HashMap<Integer, Transaction> transactionMap, SvcLogicContext ctx) - throws Exception { - - String fn = "FlowExecutorNode.executeAllTransaction "; - int retry = 0; - FlowExecutorInterface flowExecutor; - for (int key = 1; key <= transactionMap.size(); key++) { - log.debug(fn + "Starting transactions ID " + key + " :)=" + retry); - Transaction transaction = transactionMap.get(key); - if (!preProcessor(transactionMap, transaction)) { - log.info("Skipping Transaction ID " + transaction.getTransactionId()); - continue; - } - if (transaction.getExecutionType() != null) { - switch (transaction.getExecutionType()) { - case GRAPH: - flowExecutor = new GraphExecutor(); - break; - case NODE: - flowExecutor = new NodeExecutor(); - break; - case REST: - flowExecutor = new RestExecutor(); - break; - default: - throw new Exception("No Executor found for transaction ID" + transaction.getTransactionId()); - } - flowExecutor.execute(transaction, ctx); - ResponseAction responseAction = handleResponse(transaction); - if (responseAction.getWait() != null && Integer.parseInt(responseAction.getWait()) > 0) { - log.debug(fn + "Going to Sleep .... " + responseAction.getWait()); - Thread.sleep(Integer.parseInt(responseAction.getWait()) * 1000L); - } - if (responseAction.isIntermediateMessage()) { - log.debug(fn + "Sending Intermediate Message back .... "); - sendIntermediateMessage(); - } - if (responseAction.getRetry() != null && Integer.parseInt(responseAction.getRetry()) > retry) { - log.debug(fn + "Ooppss!!! We will retry again ....... "); - key--; - retry++; - log.debug(fn + "key =" + key + "retry =" + retry); - } - if (responseAction.isIgnore()) { - log.debug(fn + "Ignoring this Error and moving ahead ....... "); - continue; - } - if (responseAction.isStop()) { - log.debug(fn + "Need to Stop ....... "); - break; + FlowControlNode(FlowControlDBService dbService, FlowSequenceGenerator flowSequenceGenerator) { + this.dbService = dbService; + this.flowSequenceGenerator = flowSequenceGenerator; + } + + public void processFlow(Map<String, String> inParams, SvcLogicContext ctx) throws SvcLogicException { + log.debug("Received processParamKeys call with params : " + inParams); + String responsePrefix = inParams.get(INPUT_PARAM_RESPONSE_PREFIX); + try { + responsePrefix = StringUtils.isNotBlank(responsePrefix) ? (responsePrefix + ".") : ""; + SvcLogicContext localContext = new SvcLogicContext(); + + localContext.setAttribute(REQUEST_ID, ctx.getAttribute(REQUEST_ID)); + localContext.setAttribute(VNF_TYPE, ctx.getAttribute(VNF_TYPE)); + localContext.setAttribute(REQUEST_ACTION, ctx.getAttribute(REQUEST_ACTION)); + localContext.setAttribute(ACTION_LEVEL, ctx.getAttribute(ACTION_LEVEL)); + localContext.setAttribute(RESPONSE_PREFIX, responsePrefix); + ctx.setAttribute(RESPONSE_PREFIX, responsePrefix); + + dbService.getFlowReferenceData(ctx, inParams, localContext); + + for (String key : localContext.getAttributeKeySet()) { + log.debug("processFlow " + key + "=" + ctx.getAttribute(key)); + } + processFlowSequence(inParams, ctx, localContext); + if (!ctx.getAttribute(responsePrefix + OUTPUT_PARAM_STATUS).equals(OUTPUT_STATUS_SUCCESS)) { + throw new SvcLogicException(ctx.getAttribute(responsePrefix + OUTPUT_STATUS_MESSAGE)); + } + } catch (Exception e) { + ctx.setAttribute(responsePrefix + OUTPUT_PARAM_STATUS, OUTPUT_STATUS_FAILURE); + ctx.setAttribute(responsePrefix + OUTPUT_PARAM_ERROR_MESSAGE, e.getMessage()); + log.error("Error occurred in processFlow ", e); + throw new SvcLogicException(e.getMessage()); } - if (responseAction.getJump() != null && Integer.parseInt(responseAction.getJump()) > 0) { - key = Integer.parseInt(responseAction.getJump()); - key--; + } + + private void processFlowSequence(Map<String, String> inParams, SvcLogicContext ctx, SvcLogicContext localContext) + throws Exception { + + String fn = "FlowExecutorNode.processflowSequence"; + log.debug(fn + "Received model for flow : " + localContext.toString()); + + localContext.getAttributeKeySet().forEach(key -> log.debug(key + "=" + ctx.getAttribute(key))); + + String flowSequence = flowSequenceGenerator.getFlowSequence(inParams, ctx, localContext); + + log.debug("Received Flow Sequence : " + flowSequence); + HashMap<Integer, Transaction> transactionMap = createTransactionMap(flowSequence, localContext); + executeAllTransaction(transactionMap, ctx); + log.info("Executed all the transaction successfully"); + } + + private void executeAllTransaction(HashMap<Integer, Transaction> transactionMap, SvcLogicContext ctx) + throws Exception { + + String fn = "FlowExecutorNode.executeAllTransaction "; + int retry = 0; + FlowExecutorInterface flowExecutor; + for (int key = 1; key <= transactionMap.size(); key++) { + log.debug(fn + "Starting transactions ID " + key + " :)=" + retry); + Transaction transaction = transactionMap.get(key); + if (!preProcessor(transactionMap, transaction)) { + log.info("Skipping Transaction ID " + transaction.getTransactionId()); + continue; + } + if (transaction.getExecutionType() != null) { + switch (transaction.getExecutionType()) { + case GRAPH: + flowExecutor = new GraphExecutor(); + break; + case NODE: + flowExecutor = new NodeExecutor(); + break; + case REST: + flowExecutor = new RestExecutor(); + break; + default: + throw new Exception("No Executor found for transaction ID" + transaction.getTransactionId()); + } + flowExecutor.execute(transaction, ctx); + ResponseAction responseAction = handleResponse(transaction, ctx); + + if (responseAction.getWait() != null && Integer.parseInt(responseAction.getWait()) > 0) { + log.debug(fn + "Going to Sleep .... " + responseAction.getWait()); + Thread.sleep(Integer.parseInt(responseAction.getWait()) * 1000L); + } + if (responseAction.isIntermediateMessage()) { + log.debug(fn + "Sending Intermediate Message back .... "); + sendIntermediateMessage(); + } + if (responseAction.getRetry() != null && Integer.parseInt(responseAction.getRetry()) > retry) { + log.debug(fn + "Ooppss!!! We will retry again ....... "); + key--; + retry++; + log.debug(fn + "key =" + key + "retry =" + retry); + } + if (responseAction.isIgnore()) { + log.debug(fn + "Ignoring this Error and moving ahead ....... "); + continue; + } + if (responseAction.isStop()) { + log.debug(fn + "Need to Stop ....... "); + break; + } + if (responseAction.getJump() != null && Integer.parseInt(responseAction.getJump()) > 0) { + key = Integer.parseInt(responseAction.getJump()); + key--; + } + log.debug(fn + "key =" + key + "retry =" + retry); + + } else { + throw new Exception("Don't know how to execute transaction ID " + transaction.getTransactionId()); + } } - log.debug(fn + "key =" + key + "retry =" + retry); + } - } else { - throw new Exception("Don't know how to execute transaction ID " + transaction.getTransactionId()); - } + private void sendIntermediateMessage() { + // TODO Auto-generated method stub } - } - - private void sendIntermediateMessage() { - // TODO Auto-generated method stub - } - - private ResponseAction handleResponse(Transaction transaction) { - log.info("Handling Response for transaction Id " + transaction.getTransactionId()); - DefaultResponseHandler defaultHandler = new DefaultResponseHandler(); - return defaultHandler.handlerResponse(transaction); - } - - private boolean preProcessor(HashMap<Integer, Transaction> transactionMap, Transaction transaction) - throws IOException { - - log.debug("Starting Preprocessing Logic "); - boolean runThisStep = false; - try { - if (transaction.getPrecheck() != null - && transaction.getPrecheck().getPrecheckOptions() != null - && !transaction.getPrecheck().getPrecheckOptions().isEmpty()) { - - List<PrecheckOption> precheckOptions = transaction.getPrecheck().getPrecheckOptions(); - for (PrecheckOption precheck : precheckOptions) { - Transaction trans = transactionMap.get(precheck.getpTransactionID()); - ObjectMapper mapper = new ObjectMapper(); - log.info("Mapper= " + mapper.writeValueAsString(trans)); - HashMap trmap = mapper.readValue(mapper.writeValueAsString(trans), HashMap.class); - runThisStep = trmap.get(precheck.getParamName()) != null - && ((String) trmap.get(precheck.getParamName())) - .equalsIgnoreCase(precheck.getParamValue()); - - if (("any").equalsIgnoreCase(transaction.getPrecheck().getPrecheckOperator()) && runThisStep) { - break; - } + + private ResponseAction handleResponse(Transaction transaction, SvcLogicContext ctx) { + log.info("Handling Response for transaction Id " + transaction.getTransactionId()); + DefaultResponseHandler defaultHandler = new DefaultResponseHandler(); + return defaultHandler.handlerResponse(transaction, ctx); + } + + private boolean preProcessor(HashMap<Integer, Transaction> transactionMap, Transaction transaction) + throws IOException { + + log.debug("Starting Preprocessing Logic "); + boolean runThisStep = false; + try { + if (transaction.getPrecheck() != null && transaction.getPrecheck().getPrecheckOptions() != null + && !transaction.getPrecheck().getPrecheckOptions().isEmpty()) { + + List<PrecheckOption> precheckOptions = transaction.getPrecheck().getPrecheckOptions(); + for (PrecheckOption precheck : precheckOptions) { + Transaction trans = transactionMap.get(precheck.getpTransactionID()); + ObjectMapper mapper = new ObjectMapper(); + log.info("Mapper= " + mapper.writeValueAsString(trans)); + HashMap trmap = mapper.readValue(mapper.writeValueAsString(trans), HashMap.class); + runThisStep = trmap.get(precheck.getParamName()) != null + && ((String) trmap.get(precheck.getParamName())).equalsIgnoreCase(precheck.getParamValue()); + + if (("any").equalsIgnoreCase(transaction.getPrecheck().getPrecheckOperator()) && runThisStep) { + break; + } + } + } else { + log.debug("No Pre check defined for transaction ID " + transaction.getTransactionId()); + runThisStep = true; + } + } catch (Exception e) { + log.error("Error occured when Preprocessing Logic ", e); + throw e; } - } else { - log.debug("No Pre check defined for transaction ID " + transaction.getTransactionId()); - runThisStep = true; - } - } catch (Exception e) { - log.error("Error occured when Preprocessing Logic ", e); - throw e; + log.debug("Returing process current Transaction = " + runThisStep); + return runThisStep; } - log.debug("Returing process current Transaction = " + runThisStep); - return runThisStep; - } - - private HashMap<Integer, Transaction> createTransactionMap(String flowSequence, SvcLogicContext localContext) - throws Exception { - - ObjectMapper mapper = new ObjectMapper(); - Transactions transactions = mapper.readValue(flowSequence, Transactions.class); - HashMap<Integer, Transaction> transMap = new HashMap<>(); - for (Transaction transaction : transactions.getTransactions()) { - compileFlowDependencies(transaction, localContext); - //parse the Transactions Object and create records in process_flow_status table - //loadTransactionIntoStatus(transactions, ctx); - transMap.put(transaction.getTransactionId(), transaction); + + private HashMap<Integer, Transaction> createTransactionMap(String flowSequence, SvcLogicContext localContext) + throws Exception { + + ObjectMapper mapper = new ObjectMapper(); + Transactions transactions = mapper.readValue(flowSequence, Transactions.class); + HashMap<Integer, Transaction> transMap = new HashMap<>(); + for (Transaction transaction : transactions.getTransactions()) { + compileFlowDependencies(transaction, localContext); + // parse the Transactions Object and create records in process_flow_status table + // loadTransactionIntoStatus(transactions, ctx); + transMap.put(transaction.getTransactionId(), transaction); + } + return transMap; } - return transMap; - } - private void compileFlowDependencies(Transaction transaction, SvcLogicContext localContext) - throws Exception { + private void compileFlowDependencies(Transaction transaction, SvcLogicContext localContext) throws Exception { - dbService.populateModuleAndRPC(transaction, localContext.getAttribute(VNF_TYPE)); - ObjectMapper mapper = new ObjectMapper(); - log.debug("Individual Transaction Details :" + transaction.toString()); + dbService.populateModuleAndRPC(transaction, localContext.getAttribute(VNF_TYPE)); + ObjectMapper mapper = new ObjectMapper(); + log.debug("Individual Transaction Details :" + transaction.toString()); - if ((localContext.getAttribute(SEQUENCE_TYPE) == null) - || (localContext.getAttribute(SEQUENCE_TYPE) != null - && !localContext.getAttribute(SEQUENCE_TYPE) - .equalsIgnoreCase(DESINGTIME))) { + if ((localContext.getAttribute(SEQUENCE_TYPE) == null) || (localContext.getAttribute(SEQUENCE_TYPE) != null + && !localContext.getAttribute(SEQUENCE_TYPE).equalsIgnoreCase(DESINGTIME))) { - localContext.setAttribute("artifact-content", mapper.writeValueAsString(transaction)); - dbService.loadSequenceIntoDB(localContext); + localContext.setAttribute("artifact-content", mapper.writeValueAsString(transaction)); + dbService.loadSequenceIntoDB(localContext); + } + // get a field in transction class as transactionhandle interface and register + // the Handler here for each transactions } - //get a field in transction class as transactionhandle interface and register the Handler here for each trnactions - } } |