diff options
Diffstat (limited to 'services/services-onappf/src/main')
3 files changed, 153 insertions, 58 deletions
diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java index 4f68b90ae..1953939b7 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java @@ -21,17 +21,21 @@ package org.onap.policy.apex.services.onappf.handler; import com.google.gson.JsonObject; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; - +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.service.engine.main.ApexMain; import org.onap.policy.apex.services.onappf.exception.ApexStarterException; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,31 +50,41 @@ public class ApexEngineHandler { private ApexMain apexMain; /** - * Constructs the object. Extracts the apex config and model files and instantiates the apex engine. + * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine. * - * @param properties the properties which contains the policies and configurations received from pap - * @throws ApexStarterException if the apex engine instantiation failed using the properties passed + * @param policies the list of policies + * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ - public ApexEngineHandler(final Object properties) throws ApexStarterException { - final StandardCoder standardCoder = new StandardCoder(); - String policyModel; - String apexConfig; - try { - JsonObject body = standardCoder.decode(standardCoder.encode(properties), JsonObject.class); - final JsonObject engineServiceParameters = body.get("engineServiceParameters").getAsJsonObject(); - policyModel = standardCoder.encode(engineServiceParameters.get("policy_type_impl")); - engineServiceParameters.remove("policy_type_impl"); - apexConfig = standardCoder.encode(body); - } catch (final CoderException e) { - throw new ApexStarterException(e); - } + public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException { + Map<ToscaPolicyIdentifier, String[]> policyArgsMap = new LinkedHashMap<>(); + for (ToscaPolicy policy : policies) { + Object properties = policy.getProperties().get("content"); + final StandardCoder standardCoder = new StandardCoder(); + String policyModel; + String apexConfig; + try { + JsonObject body = standardCoder.decode(standardCoder.encode(properties), JsonObject.class); + final JsonObject engineServiceParameters = body.get("engineServiceParameters").getAsJsonObject(); + policyModel = standardCoder.encode(engineServiceParameters.get("policy_type_impl")); + engineServiceParameters.remove("policy_type_impl"); + apexConfig = standardCoder.encode(body); + } catch (final CoderException e) { + throw new ApexStarterException(e); + } + + final String modelFilePath = createFile(policyModel, "modelFile"); - final String modelFilePath = createFile(policyModel, "modelFile"); + final String apexConfigFilePath = createFile(apexConfig, "apexConfigFile"); + final String[] apexArgs = { "-c", apexConfigFilePath, "-m", modelFilePath }; + policyArgsMap.put(policy.getIdentifier(), apexArgs); + } - final String apexConfigFilePath = createFile(apexConfig, "apexConfigFile"); - final String[] apexArgs = { "-c", apexConfigFilePath, "-m", modelFilePath }; LOGGER.debug("Starting apex engine."); - apexMain = new ApexMain(apexArgs); + try { + apexMain = new ApexMain(policyArgsMap); + } catch (ApexException e) { + throw new ApexStarterException(e); + } } /** @@ -100,6 +114,13 @@ public class ApexEngineHandler { } /** + * Method that return the list of running policies in the apex engine. + */ + public List<ToscaPolicyIdentifier> getRunningPolicies() { + return new ArrayList<>(apexMain.getApexParametersMap().keySet()); + } + + /** * Method to shut down the apex engine. */ public void shutdown() throws ApexStarterException { diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpStateChangeMessageHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpStateChangeMessageHandler.java index 8658150c0..fd95b47b7 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpStateChangeMessageHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpStateChangeMessageHandler.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.services.onappf.handler; +import java.util.HashSet; import java.util.List; import org.onap.policy.apex.services.onappf.ApexStarterConstants; import org.onap.policy.apex.services.onappf.comm.PdpStatusPublisher; @@ -31,6 +32,7 @@ import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.enums.PdpResponseStatus; import org.onap.policy.models.pdp.enums.PdpState; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,25 +95,55 @@ public class PdpStateChangeMessageHandler { pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), PdpResponseStatus.SUCCESS, "State changed to active. No policies found."); } else { - try { - // assumed that the apex policies list contains only one entry. - final ApexEngineHandler apexEngineHandler = - new ApexEngineHandler(policies.get(0).getProperties().get("content")); - Registry.registerOrReplace(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, apexEngineHandler); - if (apexEngineHandler.isApexEngineRunning()) { - pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), + pdpResponseDetails = startApexEngine(pdpStateChangeMsg, pdpStatusContext, pdpMessageHandler, policies); + } + } + return pdpResponseDetails; + } + + /** + * Method to start apex engine. + * + * @param pdpStateChangeMsg pdp state change message + * @param pdpStatusContext pdp status in memory + * @param pdpMessageHandler the pdp message handler + * @param policies list of policies + * @return pdp response details + */ + private PdpResponseDetails startApexEngine(final PdpStateChange pdpStateChangeMsg, final PdpStatus pdpStatusContext, + final PdpMessageHandler pdpMessageHandler, final List<ToscaPolicy> policies) { + PdpResponseDetails pdpResponseDetails; + try { + final ApexEngineHandler apexEngineHandler = new ApexEngineHandler(policies); + Registry.registerOrReplace(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, apexEngineHandler); + if (apexEngineHandler.isApexEngineRunning()) { + List<ToscaPolicyIdentifier> runningPolicies = apexEngineHandler.getRunningPolicies(); + // only the policies which are succesfully executed should be there in the heartbeat + pdpStatusContext.setPolicies(runningPolicies); + if (new HashSet<>(runningPolicies) + .equals(new HashSet<>(pdpMessageHandler.getToscaPolicyIdentifiers(policies)))) { + pdpResponseDetails = + pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), PdpResponseStatus.SUCCESS, "Apex engine started. State changed to active."); - pdpStatusContext.setState(PdpState.ACTIVE); - } else { - pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), - PdpResponseStatus.FAIL, "Apex engine failed to start. State cannot be changed to active."); + } else { + StringBuilder message = new StringBuilder( + "Apex engine started. But, only the following polices are running - "); + for (ToscaPolicyIdentifier policy : runningPolicies) { + message.append(policy.getName()).append(":").append(policy.getVersion()).append(" "); } - } catch (final ApexStarterException e) { - LOGGER.error("Pdp update failed as the policies couldn't be undeployed.", e); - pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), - PdpResponseStatus.FAIL, "Apex engine service running failed. " + e.getMessage()); + message.append(". Other policies failed execution. Please see the logs for more details."); + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails( + pdpStateChangeMsg.getRequestId(), PdpResponseStatus.SUCCESS, message.toString()); } + pdpStatusContext.setState(PdpState.ACTIVE); + } else { + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), + PdpResponseStatus.FAIL, "Apex engine failed to start. State cannot be changed to active."); } + } catch (final ApexStarterException e) { + LOGGER.error("Pdp State Change failed.", e); + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpStateChangeMsg.getRequestId(), + PdpResponseStatus.FAIL, "Apex engine service running failed. " + e.getMessage()); } return pdpResponseDetails; } diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpUpdateMessageHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpUpdateMessageHandler.java index d807dc50e..ecc0bec21 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpUpdateMessageHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/PdpUpdateMessageHandler.java @@ -20,6 +20,8 @@ package org.onap.policy.apex.services.onappf.handler; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import org.onap.policy.apex.services.onappf.ApexStarterConstants; import org.onap.policy.apex.services.onappf.comm.PdpStatusPublisher; @@ -31,6 +33,7 @@ import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.concepts.PdpUpdate; import org.onap.policy.models.pdp.enums.PdpResponseStatus; import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,27 +57,11 @@ public class PdpUpdateMessageHandler { PdpResponseDetails pdpResponseDetails = null; if (pdpUpdateMsg.appliesTo(pdpStatusContext.getName(), pdpStatusContext.getPdpGroup(), pdpStatusContext.getPdpSubgroup())) { - final PdpStatusPublisher pdpStatusPublisher = Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER); if (checkIfAlreadyHandled(pdpUpdateMsg, pdpStatusContext)) { pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), PdpResponseStatus.SUCCESS, "Pdp already updated"); } else { - if (null != pdpUpdateMsg.getPdpHeartbeatIntervalMs() && pdpUpdateMsg.getPdpHeartbeatIntervalMs() > 0 - && pdpStatusPublisher.getInterval() != pdpUpdateMsg.getPdpHeartbeatIntervalMs()) { - updateInterval(pdpUpdateMsg.getPdpHeartbeatIntervalMs()); - } - pdpStatusContext.setPdpGroup(pdpUpdateMsg.getPdpGroup()); - pdpStatusContext.setPdpSubgroup(pdpUpdateMsg.getPdpSubgroup()); - pdpStatusContext - .setPolicies(new PdpMessageHandler().getToscaPolicyIdentifiers(pdpUpdateMsg.getPolicies())); - if (pdpStatusContext.getState().equals(PdpState.ACTIVE)) { - pdpResponseDetails = startOrStopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler); - } - Registry.registerOrReplace(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST, pdpUpdateMsg.getPolicies()); - if (null == pdpResponseDetails) { - pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), - PdpResponseStatus.SUCCESS, "Pdp update successful."); - } + pdpResponseDetails = handlePdpUpdate(pdpUpdateMsg, pdpMessageHandler, pdpStatusContext); } final PdpStatusPublisher pdpStatusPublisherTemp = Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER); @@ -86,6 +73,47 @@ public class PdpUpdateMessageHandler { } /** + * Method to do pdp update. + * + * @param pdpUpdateMsg the pdp update message + * @param pdpMessageHandler the message handler + * @param pdpStatusContext the pdp status in memory + * @return pdpResponseDetails the pdp response + */ + private PdpResponseDetails handlePdpUpdate(final PdpUpdate pdpUpdateMsg, final PdpMessageHandler pdpMessageHandler, + final PdpStatus pdpStatusContext) { + PdpResponseDetails pdpResponseDetails = null; + final PdpStatusPublisher pdpStatusPublisher = Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER); + if (null != pdpUpdateMsg.getPdpHeartbeatIntervalMs() && pdpUpdateMsg.getPdpHeartbeatIntervalMs() > 0 + && pdpStatusPublisher.getInterval() != pdpUpdateMsg.getPdpHeartbeatIntervalMs()) { + updateInterval(pdpUpdateMsg.getPdpHeartbeatIntervalMs()); + } + pdpStatusContext.setPdpGroup(pdpUpdateMsg.getPdpGroup()); + pdpStatusContext.setPdpSubgroup(pdpUpdateMsg.getPdpSubgroup()); + pdpStatusContext + .setPolicies(new PdpMessageHandler().getToscaPolicyIdentifiers(pdpUpdateMsg.getPolicies())); + Registry.registerOrReplace(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST, pdpUpdateMsg.getPolicies()); + if (pdpStatusContext.getState().equals(PdpState.ACTIVE)) { + pdpResponseDetails = startOrStopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler); + + ApexEngineHandler apexEngineHandler = Registry.get(ApexStarterConstants.REG_APEX_ENGINE_HANDLER); + // in hearbeat while in active state, only the policies which are running should be there. + // if some policy fails, that shouldn't go in the heartbeat. + // If no policies are running, then the policy list in the heartbeat can be empty + if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) { + pdpStatusContext.setPolicies(apexEngineHandler.getRunningPolicies()); + } else { + pdpStatusContext.setPolicies(Collections.emptyList()); + } + } + if (null == pdpResponseDetails) { + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), + PdpResponseStatus.SUCCESS, "Pdp update successful."); + } + return pdpResponseDetails; + } + + /** * Method to start or stop apex engine based on the list of policies received from pap. When current state is * active, if PAP sends PdpUpdate with empty policies list, stop apex engine, or, if there is a change in policies, * stop the current running policies and the deploy the new ones. @@ -118,6 +146,8 @@ public class PdpUpdateMessageHandler { if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) { try { apexEngineHandler.shutdown(); + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), + PdpResponseStatus.SUCCESS, "Pdp update successful. No policies are running."); } catch (final ApexStarterException e) { LOGGER.error("Pdp update failed as the policies couldn't be undeployed.", e); pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), @@ -134,12 +164,24 @@ public class PdpUpdateMessageHandler { if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) { apexEngineHandler.shutdown(); } - apexEngineHandler = - new ApexEngineHandler(pdpUpdateMsg.getPolicies().get(0).getProperties().get("content")); + apexEngineHandler = new ApexEngineHandler(pdpUpdateMsg.getPolicies()); Registry.registerOrReplace(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, apexEngineHandler); if (apexEngineHandler.isApexEngineRunning()) { - pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), - PdpResponseStatus.SUCCESS, "Apex engine started and policies are running."); + List<ToscaPolicyIdentifier> runningPolicies = apexEngineHandler.getRunningPolicies(); + if (new HashSet<>(runningPolicies) + .equals(new HashSet<>(pdpMessageHandler.getToscaPolicyIdentifiers(pdpUpdateMsg.getPolicies())))) { + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), + PdpResponseStatus.SUCCESS, "Apex engine started and policies are running."); + } else { + StringBuilder message = + new StringBuilder("Apex engine started. But, only the following polices are running - "); + for (ToscaPolicyIdentifier policy : runningPolicies) { + message.append(policy.getName()).append(":").append(policy.getVersion()).append(" "); + } + message.append(". Other policies failed execution. Please see the logs for more details."); + pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), + PdpResponseStatus.SUCCESS, message.toString()); + } } else { pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(), PdpResponseStatus.FAIL, "Apex engine failed to start."); |