diff options
Diffstat (limited to 'services/services-engine/src/main')
2 files changed, 111 insertions, 78 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java index c99987542..ee1a5dcdf 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java @@ -66,6 +66,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven // Recurring string constants private static final String ENGINE_KEY_PREAMBLE = "engine with key "; private static final String NOT_FOUND_SUFFIX = " not found in engine service"; + private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null"; // Constants for timing private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds @@ -95,23 +96,17 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven * constructor is private to prevent subclassing. * * @param engineServiceKey the engine service key - * @param incomingThreadCount the thread count, the number of engine workers to start + * @param threadCount the thread count, the number of engine workers to start * @param periodicEventPeriod the period in milliseconds at which periodic events are generated * @throws ApexException on worker instantiation errors */ - private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount, + private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount, final long periodicEventPeriod) { - LOGGER.entry(engineServiceKey, incomingThreadCount); + LOGGER.entry(engineServiceKey, threadCount); this.engineServiceKey = engineServiceKey; this.periodicEventPeriod = periodicEventPeriod; - int threadCount = incomingThreadCount; - if (threadCount <= 0) { - // Just start one engine worker - threadCount = 1; - } - // Start engine workers for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) { final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter, @@ -137,22 +132,18 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException { if (config == null) { LOGGER.warn("Engine service configuration parameters is null"); - throw new ApexException("engine service configuration parameters is null"); + throw new ApexException("engine service configuration parameters are null"); } + final GroupValidationResult validation = config.validate(); if (!validation.isValid()) { LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult()); throw new ApexException("Invalid engine service configuration parameters: " + validation); } + final AxArtifactKey engineServiceKey = config.getEngineKey(); final int threadCount = config.getInstanceCount(); - // Check if the Apex model specified is sane - if (engineServiceKey == null) { - LOGGER.warn("engine service key is null"); - throw new ApexException("engine service key is null"); - } - return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod()); } @@ -166,6 +157,18 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { LOGGER.entry(apexEventListener); + if (listenerName == null) { + String message = "listener name must be specified and may not be null"; + LOGGER.warn(message); + return; + } + + if (apexEventListener == null) { + String message = "apex event listener must be specified and may not be null"; + LOGGER.warn(message); + return; + } + // Register the Apex event listener on all engine workers, each worker will return Apex // events to the listening application for (final EngineService engineWorker : engineWorkerMap.values()) { @@ -248,6 +251,13 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven @Override public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString, final boolean forceFlag) throws ApexException { + // Check if the engine service key specified is sane + if (incomingEngineServiceKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if the Apex model specified is sane if (apexModelString == null || apexModelString.trim().length() == 0) { String emptyModelMessage = "model for updating engine service with key " @@ -267,12 +277,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven throw new ApexException(message, e); } - if (apexPolicyModel == null) { - String message = "apex model null on engine service " + incomingEngineServiceKey.getId(); - LOGGER.error(message); - throw new ApexException(message); - } - // Update the model updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag); @@ -290,6 +294,13 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven final boolean forceFlag) throws ApexException { LOGGER.entry(incomingEngineServiceKey); + // Check if the engine service key specified is sane + if (incomingEngineServiceKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if the Apex model specified is sane if (apexModel == null) { LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId() @@ -315,6 +326,22 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven } } + executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag); + + LOGGER.exit(); + } + + /** + * Execute the model update on the engine instances. + * + * @param incomingEngineServiceKey the engine service key to update + * @param apexModel the model to update the engines with + * @param forceFlag if true, ignore compatibility problems + * @throws ApexException on model update errors + */ + private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel, + final boolean forceFlag) throws ApexException { + if (!isStopped()) { stopEngines(incomingEngineServiceKey); } @@ -349,8 +376,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven LOGGER.warn(errorString); throw new ApexException(errorString); } - - LOGGER.exit(); } /** @@ -432,11 +457,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven for (final EngineService engine : engineWorkerMap.values()) { start(engine.getKey()); } - - // Check if periodic events should be turned on - if (periodicEventPeriod > 0) { - startPeriodicEvents(periodicEventPeriod); - } } /* @@ -449,6 +469,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public void start(final AxArtifactKey engineKey) throws ApexException { LOGGER.entry(engineKey); + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX; @@ -458,6 +484,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven // Start the engine engineWorkerMap.get(engineKey).start(engineKey); + + // Check if periodic events should be turned on + if (periodicEventPeriod > 0) { + startPeriodicEvents(periodicEventPeriod); + } LOGGER.exit(engineKey); } @@ -471,6 +502,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public void stop() throws ApexException { LOGGER.entry(); + if (periodicEventGenerator != null) { + periodicEventGenerator.cancel(); + periodicEventGenerator = null; + } + // Stop each engine for (final EngineService engine : engineWorkerMap.values()) { if (engine.getState() != AxEngineState.STOPPED) { @@ -491,6 +527,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public void stop(final AxArtifactKey engineKey) throws ApexException { LOGGER.entry(engineKey); + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); @@ -532,6 +574,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven public void clear(final AxArtifactKey engineKey) throws ApexException { LOGGER.entry(engineKey); + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); @@ -570,9 +618,16 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven */ @Override public boolean isStarted(final AxArtifactKey engineKey) { + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + return false; + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); + return false; } return engineWorkerMap.get(engineKey).isStarted(); } @@ -601,9 +656,16 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven */ @Override public boolean isStopped(final AxArtifactKey engineKey) { + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + return true; + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); + return true; } return engineWorkerMap.get(engineKey).isStopped(); } @@ -647,6 +709,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven // Stop periodic events periodicEventGenerator.cancel(); periodicEventGenerator = null; + periodicEventPeriod = 0; } /* @@ -657,6 +720,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven */ @Override public String getStatus(final AxArtifactKey engineKey) throws ApexException { + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); @@ -675,6 +744,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven */ @Override public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException { + if (engineKey == null) { + String message = ENGINE_KEY_NOT_SPECIFIED; + LOGGER.warn(message); + throw new ApexException(message); + } + // Check if we have this key on our map if (!engineWorkerMap.containsKey(engineKey)) { LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX); @@ -693,6 +768,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven */ @Override public void sendEvent(final ApexEvent event) { + if (event == null) { + LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId()); + return; + } + // Check if we have this key on our map if (getState() == AxEngineState.STOPPED) { LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service " @@ -700,11 +780,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven return; } - if (event == null) { - LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId()); - return; - } - if (DEBUG_ENABLED) { LOGGER.debug("Forwarding Apex Event {} to the processing engine", event); } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java index 56b3b84c4..a7d179959 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java @@ -76,7 +76,6 @@ final class EngineWorker implements EngineService { private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class); // Recurring string constants - private static final String IS_NULL_SUFFIX = " is null"; private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key "; private static final String ENGINE_SUFFIX = " of this engine"; private static final String BAD_KEY_MATCH_TAG = " does not match the key"; @@ -108,7 +107,7 @@ final class EngineWorker implements EngineService { * @param threadFactory the thread factory to use for creating the event processing thread * @throws ApexException thrown on errors on worker instantiation */ - EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue, + protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue, final ApplicationThreadFactory threadFactory) { LOGGER.entry(engineWorkerKey); @@ -136,13 +135,6 @@ final class EngineWorker implements EngineService { */ @Override public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { - // Sanity checks on the Apex model - if (engine == null) { - LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getId() - + ", failed, listener is null"); - return; - } - engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter)); } @@ -155,13 +147,6 @@ final class EngineWorker implements EngineService { */ @Override public void deregisterActionListener(final String listenerName) { - // Sanity checks on the Apex model - if (engine == null) { - LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getId() - + ", failed, listener is null"); - return; - } - engine.removeEventListener(listenerName); } @@ -233,11 +218,6 @@ final class EngineWorker implements EngineService { throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e); } - if (apexPolicyModel == null) { - LOGGER.error("apex model null on engine " + engineKey.getId()); - throw new ApexException("apex model null on engine " + engineKey.getId()); - } - // Update the Apex model in the Apex engine updateModel(engineKey, apexPolicyModel, forceFlag); @@ -265,12 +245,6 @@ final class EngineWorker implements EngineService { throw new ApexException(message); } - // Sanity checks on the Apex model - if (engine == null) { - LOGGER.warn("engine with key " + engineKey.getId() + " not initialized"); - throw new ApexException("engine with key " + engineKey.getId() + " not initialized"); - } - // Check model compatibility if (ModelService.existsModel(AxPolicyModel.class)) { // The current policy model may or may not be defined @@ -336,12 +310,6 @@ final class EngineWorker implements EngineService { + engineWorkerKey.getId() + ENGINE_SUFFIX); } - if (engine == null) { - String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null"; - LOGGER.error(message); - throw new ApexException(message); - } - // Starts the event processing thread that handles incoming events if (processorThread != null && processorThread.isAlive()) { String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " @@ -387,12 +355,6 @@ final class EngineWorker implements EngineService { + engineWorkerKey.getId() + ENGINE_SUFFIX); } - if (engine == null) { - String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null"; - LOGGER.error(message); - throw new ApexException(message); - } - // Interrupt the worker to stop its thread if (processorThread == null || !processorThread.isAlive()) { processorThread = null; @@ -439,11 +401,6 @@ final class EngineWorker implements EngineService { + engineWorkerKey.getId() + ENGINE_SUFFIX); } - if (engine == null) { - LOGGER.error(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX); - throw new ApexException(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX); - } - // Interrupt the worker to stop its thread if (processorThread != null && !processorThread.isAlive()) { LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " @@ -562,6 +519,7 @@ final class EngineWorker implements EngineService { try { final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class); + modelWriter.setJsonOutput(true); modelWriter.write(apexEngineModel, baOutputStream); return baOutputStream.toString(); } catch (final Exception e) { |