diff options
Diffstat (limited to 'services')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java | 150 |
1 files changed, 66 insertions, 84 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java index a5de624d1..0ca7fe837 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,13 +62,12 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies - * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy - * is triggered by an Apex event, and when the policy is triggered it runs through to completion in - * the ApexEngine. + * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the + * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and + * when the policy is triggered it runs through to completion in the ApexEngine. * - * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it - * events, and receiving events from it. + * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving + * events from it. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -99,8 +99,8 @@ final class EngineWorker implements EngineService { private ApexEvent2EnEventConverter apexEnEventConverter = null; /** - * Constructor that creates an Apex engine, an event processor for events to be sent to that - * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB. + * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an + * {@link ApexModelReader} instance to read Apex models using JAXB. * * @param engineWorkerKey the engine worker key * @param queue the queue on which events for this Apex worker will come @@ -108,7 +108,7 @@ final class EngineWorker implements EngineService { * @throws ApexException thrown on errors on worker instantiation */ protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue, - final ApplicationThreadFactory threadFactory) { + final ApplicationThreadFactory threadFactory) { LOGGER.entry(engineWorkerKey); this.engineWorkerKey = engineWorkerKey; @@ -129,9 +129,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. - * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String, + * org.onap.policy.apex.service.engine.runtime.ApexEventListener) */ @Override public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { @@ -141,9 +140,7 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. - * String) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String) */ @Override public void deregisterActionListener(final String listenerName) { @@ -153,13 +150,12 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() */ @Override public EngineServiceEventInterface getEngineServiceEventInterface() { throw new UnsupportedOperationException( - "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); + "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); } /* @@ -199,13 +195,12 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag) - throws ApexException { + throws ApexException { LOGGER.entry(engineKey); // Read the Apex model into memory using the Apex Model Reader @@ -227,20 +222,18 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey, - * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag) - throws ApexException { + throws ApexException { LOGGER.entry(engineKey); // Check if the key on the update request is correct if (!engineWorkerKey.equals(engineKey)) { String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX; + + ENGINE_SUFFIX; LOGGER.warn(message); throw new ApexException(message); } @@ -252,19 +245,18 @@ final class EngineWorker implements EngineService { if (!currentModel.getKey().isCompatible(apexModel.getKey())) { if (forceFlag) { LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId() - + "\" is not a compatible model update from the existing engine model with key \"" - + currentModel.getKey().getId() + "\""); + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getId() + "\""); } else { throw new ContextException( - "apex model update failed, supplied model with key \"" + apexModel.getKey().getId() - + "\" is not a compatible model update from the existing engine model with key \"" - + currentModel.getKey().getId() + "\""); + "apex model update failed, supplied model with key \"" + apexModel.getKey().getId() + + "\" is not a compatible model update from the existing engine model with key \"" + + currentModel.getKey().getId() + "\""); } } } // Update the Apex model in the Apex engine - engine.clear(); engine.updateModel(apexModel); LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey); @@ -294,9 +286,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void start(final AxArtifactKey engineKey) throws ApexException { @@ -304,16 +295,16 @@ final class EngineWorker implements EngineService { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Starts the event processing thread that handles incoming events if (processorThread != null && processorThread.isAlive()) { String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " - + getState(); + + getState(); LOGGER.error(message); throw new ApexException(message); } @@ -341,26 +332,25 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void stop(final AxArtifactKey engineKey) throws ApexException { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Interrupt the worker to stop its thread if (processorThread == null || !processorThread.isAlive()) { processorThread = null; - LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " - + getState()); + LOGGER + .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState()); return; } @@ -387,24 +377,22 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. - * model. concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. model. + * concepts.AxArtifactKey) */ @Override public void clear(final AxArtifactKey engineKey) throws ApexException { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { - LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() - + ENGINE_SUFFIX); - throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG - + engineWorkerKey.getId() + ENGINE_SUFFIX); + LOGGER.warn( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); + throw new ApexException( + ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX); } // Interrupt the worker to stop its thread if (processorThread != null && !processorThread.isAlive()) { - LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " - + getState()); + LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState()); return; } @@ -427,9 +415,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStarted(final AxArtifactKey engineKey) { @@ -461,9 +448,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. - * model. basicmodel.concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. model. + * basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStopped(final AxArtifactKey engineKey) { @@ -505,9 +491,8 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core - * .model .concepts.AxArtifactKey) + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core .model + * .concepts.AxArtifactKey) */ @Override public String getStatus(final AxArtifactKey engineKey) { @@ -531,8 +516,7 @@ final class EngineWorker implements EngineService { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex + * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex * .core.model.concepts.AxArtifactKey) */ @Override @@ -570,20 +554,19 @@ final class EngineWorker implements EngineService { runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey())); runtimeJsonStringBuilder.append(",\"AlbumContent\":["); - // Get the schema helper to use to marshal context album objects to JSON - final AxContextAlbum axContextAlbum = - ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey()); + final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class) + .get(contextAlbumEntry.getKey()); SchemaHelper schemaHelper = null; try { // Get a schema helper to manage the translations between objects on the album map // for this album schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(), - axContextAlbum.getItemSchema()); + axContextAlbum.getItemSchema()); } catch (final ContextRuntimeException e) { - final String resultString = - "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON"; + final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum + + "\" to JSON"; LOGGER.warn(resultString, e); // End of context album entry @@ -626,9 +609,9 @@ final class EngineWorker implements EngineService { } /** - * This is an event processor thread, this class decouples the events handling logic from core - * business logic. This class runs its own thread and continuously querying the blocking queue - * for the events that have been sent to the worker for processing by the Apex engine. + * This is an event processor thread, this class decouples the events handling logic from core business logic. This + * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the + * worker for processing by the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -640,8 +623,7 @@ final class EngineWorker implements EngineService { /** * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects. * - * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger - * events. + * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events. */ EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) { this.eventProcessingQueue = eventProcessingQueue; @@ -659,7 +641,7 @@ final class EngineWorker implements EngineService { // Take events from the event processing queue of the worker and pass them to the engine // for processing boolean stopFlag = false; - while (processorThread != null && !processorThread.isInterrupted() && ! stopFlag) { + while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) { ApexEvent event = null; try { event = eventProcessingQueue.take(); @@ -673,7 +655,7 @@ final class EngineWorker implements EngineService { try { if (event != null) { debugEventIfDebugEnabled(event); - + final EnEvent enevent = apexEnEventConverter.fromApexEvent(event); engine.handleEvent(enevent); } |