From f702224c07abd43d72ab0198cf760537d3d35d8a Mon Sep 17 00:00:00 2001 From: liamfallon Date: Wed, 9 Jan 2019 14:30:23 +0100 Subject: Fix context clearing on model upgrade Existing context is cleared when a model upgrade is carried out on an Apex PDP. The clear() method is called in error on the Apex engine during the upgrade. This change also has some small updates to the DecisionMaker example, which is used to test this feature. Issue-ID: POLICY-1395 Change-Id: I547a3a60712d1572d9daadf5eab4afcae2a5c321 Signed-off-by: liamfallon (cherry picked from commit cff079a8dc3eeaf4e88c0e89a828fe4244a1b3c8) --- .../service/engine/runtime/impl/EngineWorker.java | 150 +++++++++------------ 1 file changed, 66 insertions(+), 84 deletions(-) (limited to 'services/services-engine/src/main/java') diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java index a5de624d1..0ca7fe837 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,13 +62,12 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies - * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy - * is triggered by an Apex event, and when the policy is triggered it runs through to completion in - * the ApexEngine. + * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the + * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and + * when the policy is triggered it runs through to completion in the ApexEngine. * - *

This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it - * events, and receiving events from it. + *

This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving + * events from it. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -99,8 +99,8 @@ final class EngineWorker implements EngineService { private ApexEvent2EnEventConverter apexEnEventConverter = null; /** - * Constructor that creates an Apex engine, an event processor for events to be sent to that - * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB. + * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an + * {@link ApexModelReader} instance to read Apex models using JAXB. * * @param engineWorkerKey the engine worker key * @param queue the queue on which events for this Apex worker will come @@ -108,7 +108,7 @@ final class EngineWorker implements EngineService { * @throws ApexException thrown on errors on worker instantiation */ protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue queue, - final ApplicationThreadFactory threadFactory) { + final ApplicationThreadFactory threadFactory) { LOGGER.entry(engineWorkerKey); this.engineWorkerKey = engineWorkerKey; @@ -129,9 +129,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. - * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String, + * org.onap.policy.apex.service.engine.runtime.ApexEventListener) */ @Override public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { @@ -141,9 +140,7 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. - * String) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String) */ @Override public void deregisterActionListener(final String listenerName) { @@ -153,13 +150,12 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() */ @Override public EngineServiceEventInterface getEngineServiceEventInterface() { throw new UnsupportedOperationException( - "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); + "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); } /* @@ -199,13 +195,12 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag) - throws ApexException { + throws ApexException { LOGGER.entry(engineKey); // Read the Apex model into memory using the Apex Model Reader @@ -227,20 +222,18 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey, - * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag) - throws ApexException { + throws ApexException { LOGGER.entry(engineKey); // Check if the key on the update request is correct if (!engineWorkerKey.equals(engineKey)) { String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX; + + ENGINE_SUFFIX; LOGGER.warn(message); throw new ApexException(message); } @@ -252,19 +245,18 @@ final class EngineWorker implements EngineService { if (!currentModel.getKey().isCompatible(apexModel.getKey())) { if (forceFlag) { LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId() - + "\" is not a compatible model update from the existing engine model with key \"" - + currentModel.getKey().getId() + "\""); + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getId() + "\""); } else { throw new ContextException( - "apex model update failed, supplied model with key \"" + apexModel.getKey().getId() - + "\" is not a compatible model update from the existing engine model with key \"" - + currentModel.getKey().getId() + "\""); + "apex model update failed, supplied model with key \"" + apexModel.getKey().getId() + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getId() + "\""); } } } // Update the Apex model in the Apex engine - engine.clear(); engine.updateModel(apexModel); LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey); @@ -294,9 +286,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void start(final AxArtifactKey engineKey) throws ApexException { @@ -304,16 +295,16 @@ final class EngineWorker implements EngineService { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Starts the event processing thread that handles incoming events if (processorThread != null && processorThread.isAlive()) { String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " - + getState(); + + getState(); LOGGER.error(message); throw new ApexException(message); } @@ -341,26 +332,25 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void stop(final AxArtifactKey engineKey) throws ApexException { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Interrupt the worker to stop its thread if (processorThread == null || !processorThread.isAlive()) { processorThread = null; - LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " - + getState()); + LOGGER + .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState()); return; } @@ -387,24 +377,22 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void clear(final AxArtifactKey engineKey) throws ApexException { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Interrupt the worker to stop its thread if (processorThread != null && !processorThread.isAlive()) { - LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " - + getState()); + LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState()); return; } @@ -427,9 +415,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStarted(final AxArtifactKey engineKey) { @@ -461,9 +448,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStopped(final AxArtifactKey engineKey) { @@ -505,9 +491,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core - * .model .concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core .model + * .concepts.AxArtifactKey) */ @Override public String getStatus(final AxArtifactKey engineKey) { @@ -531,8 +516,7 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex * .core.model.concepts.AxArtifactKey) */ @Override @@ -570,20 +554,19 @@ final class EngineWorker implements EngineService { runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey())); runtimeJsonStringBuilder.append(",\"AlbumContent\":["); - // Get the schema helper to use to marshal context album objects to JSON - final AxContextAlbum axContextAlbum = - ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey()); + final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class) + .get(contextAlbumEntry.getKey()); SchemaHelper schemaHelper = null; try { // Get a schema helper to manage the translations between objects on the album map // for this album schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(), - axContextAlbum.getItemSchema()); + axContextAlbum.getItemSchema()); } catch (final ContextRuntimeException e) { - final String resultString = - "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON"; + final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum + + "\" to JSON"; LOGGER.warn(resultString, e); // End of context album entry @@ -626,9 +609,9 @@ final class EngineWorker implements EngineService { } /** - * This is an event processor thread, this class decouples the events handling logic from core - * business logic. This class runs its own thread and continuously querying the blocking queue - * for the events that have been sent to the worker for processing by the Apex engine. + * This is an event processor thread, this class decouples the events handling logic from core business logic. This + * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the + * worker for processing by the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -640,8 +623,7 @@ final class EngineWorker implements EngineService { /** * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects. * - * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger - * events. + * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events. */ EventProcessor(final BlockingQueue eventProcessingQueue) { this.eventProcessingQueue = eventProcessingQueue; @@ -659,7 +641,7 @@ final class EngineWorker implements EngineService { // Take events from the event processing queue of the worker and pass them to the engine // for processing boolean stopFlag = false; - while (processorThread != null && !processorThread.isInterrupted() && ! stopFlag) { + while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) { ApexEvent event = null; try { event = eventProcessingQueue.take(); @@ -673,7 +655,7 @@ final class EngineWorker implements EngineService { try { if (event != null) { debugEventIfDebugEnabled(event); - + final EnEvent enevent = apexEnEventConverter.fromApexEvent(event); engine.handleEvent(enevent); } -- cgit 1.2.3-korg