summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main/java')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java143
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java46
2 files changed, 111 insertions, 78 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
index c99987542..ee1a5dcdf 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
@@ -66,6 +66,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Recurring string constants
private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
private static final String NOT_FOUND_SUFFIX = " not found in engine service";
+ private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
// Constants for timing
private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
@@ -95,23 +96,17 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
* constructor is private to prevent subclassing.
*
* @param engineServiceKey the engine service key
- * @param incomingThreadCount the thread count, the number of engine workers to start
+ * @param threadCount the thread count, the number of engine workers to start
* @param periodicEventPeriod the period in milliseconds at which periodic events are generated
* @throws ApexException on worker instantiation errors
*/
- private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
+ private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
final long periodicEventPeriod) {
- LOGGER.entry(engineServiceKey, incomingThreadCount);
+ LOGGER.entry(engineServiceKey, threadCount);
this.engineServiceKey = engineServiceKey;
this.periodicEventPeriod = periodicEventPeriod;
- int threadCount = incomingThreadCount;
- if (threadCount <= 0) {
- // Just start one engine worker
- threadCount = 1;
- }
-
// Start engine workers
for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
@@ -137,22 +132,18 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
if (config == null) {
LOGGER.warn("Engine service configuration parameters is null");
- throw new ApexException("engine service configuration parameters is null");
+ throw new ApexException("engine service configuration parameters are null");
}
+
final GroupValidationResult validation = config.validate();
if (!validation.isValid()) {
LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
throw new ApexException("Invalid engine service configuration parameters: " + validation);
}
+
final AxArtifactKey engineServiceKey = config.getEngineKey();
final int threadCount = config.getInstanceCount();
- // Check if the Apex model specified is sane
- if (engineServiceKey == null) {
- LOGGER.warn("engine service key is null");
- throw new ApexException("engine service key is null");
- }
-
return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
}
@@ -166,6 +157,18 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
LOGGER.entry(apexEventListener);
+ if (listenerName == null) {
+ String message = "listener name must be specified and may not be null";
+ LOGGER.warn(message);
+ return;
+ }
+
+ if (apexEventListener == null) {
+ String message = "apex event listener must be specified and may not be null";
+ LOGGER.warn(message);
+ return;
+ }
+
// Register the Apex event listener on all engine workers, each worker will return Apex
// events to the listening application
for (final EngineService engineWorker : engineWorkerMap.values()) {
@@ -248,6 +251,13 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
@Override
public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
final boolean forceFlag) throws ApexException {
+ // Check if the engine service key specified is sane
+ if (incomingEngineServiceKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if the Apex model specified is sane
if (apexModelString == null || apexModelString.trim().length() == 0) {
String emptyModelMessage = "model for updating engine service with key "
@@ -267,12 +277,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
throw new ApexException(message, e);
}
- if (apexPolicyModel == null) {
- String message = "apex model null on engine service " + incomingEngineServiceKey.getId();
- LOGGER.error(message);
- throw new ApexException(message);
- }
-
// Update the model
updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
@@ -290,6 +294,13 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
final boolean forceFlag) throws ApexException {
LOGGER.entry(incomingEngineServiceKey);
+ // Check if the engine service key specified is sane
+ if (incomingEngineServiceKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if the Apex model specified is sane
if (apexModel == null) {
LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
@@ -315,6 +326,22 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
}
}
+ executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
+
+ LOGGER.exit();
+ }
+
+ /**
+ * Execute the model update on the engine instances.
+ *
+ * @param incomingEngineServiceKey the engine service key to update
+ * @param apexModel the model to update the engines with
+ * @param forceFlag if true, ignore compatibility problems
+ * @throws ApexException on model update errors
+ */
+ private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
+ final boolean forceFlag) throws ApexException {
+
if (!isStopped()) {
stopEngines(incomingEngineServiceKey);
}
@@ -349,8 +376,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
LOGGER.warn(errorString);
throw new ApexException(errorString);
}
-
- LOGGER.exit();
}
/**
@@ -432,11 +457,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
for (final EngineService engine : engineWorkerMap.values()) {
start(engine.getKey());
}
-
- // Check if periodic events should be turned on
- if (periodicEventPeriod > 0) {
- startPeriodicEvents(periodicEventPeriod);
- }
}
/*
@@ -449,6 +469,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void start(final AxArtifactKey engineKey) throws ApexException {
LOGGER.entry(engineKey);
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
@@ -458,6 +484,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Start the engine
engineWorkerMap.get(engineKey).start(engineKey);
+
+ // Check if periodic events should be turned on
+ if (periodicEventPeriod > 0) {
+ startPeriodicEvents(periodicEventPeriod);
+ }
LOGGER.exit(engineKey);
}
@@ -471,6 +502,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void stop() throws ApexException {
LOGGER.entry();
+ if (periodicEventGenerator != null) {
+ periodicEventGenerator.cancel();
+ periodicEventGenerator = null;
+ }
+
// Stop each engine
for (final EngineService engine : engineWorkerMap.values()) {
if (engine.getState() != AxEngineState.STOPPED) {
@@ -491,6 +527,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void stop(final AxArtifactKey engineKey) throws ApexException {
LOGGER.entry(engineKey);
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
@@ -532,6 +574,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
public void clear(final AxArtifactKey engineKey) throws ApexException {
LOGGER.entry(engineKey);
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
@@ -570,9 +618,16 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
*/
@Override
public boolean isStarted(final AxArtifactKey engineKey) {
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ return false;
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ return false;
}
return engineWorkerMap.get(engineKey).isStarted();
}
@@ -601,9 +656,16 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
*/
@Override
public boolean isStopped(final AxArtifactKey engineKey) {
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ return true;
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
+ return true;
}
return engineWorkerMap.get(engineKey).isStopped();
}
@@ -647,6 +709,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
// Stop periodic events
periodicEventGenerator.cancel();
periodicEventGenerator = null;
+ periodicEventPeriod = 0;
}
/*
@@ -657,6 +720,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
*/
@Override
public String getStatus(final AxArtifactKey engineKey) throws ApexException {
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
@@ -675,6 +744,12 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
*/
@Override
public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
+ if (engineKey == null) {
+ String message = ENGINE_KEY_NOT_SPECIFIED;
+ LOGGER.warn(message);
+ throw new ApexException(message);
+ }
+
// Check if we have this key on our map
if (!engineWorkerMap.containsKey(engineKey)) {
LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
@@ -693,6 +768,11 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
*/
@Override
public void sendEvent(final ApexEvent event) {
+ if (event == null) {
+ LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
+ return;
+ }
+
// Check if we have this key on our map
if (getState() == AxEngineState.STOPPED) {
LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
@@ -700,11 +780,6 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
return;
}
- if (event == null) {
- LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
- return;
- }
-
if (DEBUG_ENABLED) {
LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
index 56b3b84c4..a7d179959 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
@@ -76,7 +76,6 @@ final class EngineWorker implements EngineService {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
// Recurring string constants
- private static final String IS_NULL_SUFFIX = " is null";
private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
private static final String ENGINE_SUFFIX = " of this engine";
private static final String BAD_KEY_MATCH_TAG = " does not match the key";
@@ -108,7 +107,7 @@ final class EngineWorker implements EngineService {
* @param threadFactory the thread factory to use for creating the event processing thread
* @throws ApexException thrown on errors on worker instantiation
*/
- EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
+ protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
final ApplicationThreadFactory threadFactory) {
LOGGER.entry(engineWorkerKey);
@@ -136,13 +135,6 @@ final class EngineWorker implements EngineService {
*/
@Override
public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
- // Sanity checks on the Apex model
- if (engine == null) {
- LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getId()
- + ", failed, listener is null");
- return;
- }
-
engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
}
@@ -155,13 +147,6 @@ final class EngineWorker implements EngineService {
*/
@Override
public void deregisterActionListener(final String listenerName) {
- // Sanity checks on the Apex model
- if (engine == null) {
- LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getId()
- + ", failed, listener is null");
- return;
- }
-
engine.removeEventListener(listenerName);
}
@@ -233,11 +218,6 @@ final class EngineWorker implements EngineService {
throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
}
- if (apexPolicyModel == null) {
- LOGGER.error("apex model null on engine " + engineKey.getId());
- throw new ApexException("apex model null on engine " + engineKey.getId());
- }
-
// Update the Apex model in the Apex engine
updateModel(engineKey, apexPolicyModel, forceFlag);
@@ -265,12 +245,6 @@ final class EngineWorker implements EngineService {
throw new ApexException(message);
}
- // Sanity checks on the Apex model
- if (engine == null) {
- LOGGER.warn("engine with key " + engineKey.getId() + " not initialized");
- throw new ApexException("engine with key " + engineKey.getId() + " not initialized");
- }
-
// Check model compatibility
if (ModelService.existsModel(AxPolicyModel.class)) {
// The current policy model may or may not be defined
@@ -336,12 +310,6 @@ final class EngineWorker implements EngineService {
+ engineWorkerKey.getId() + ENGINE_SUFFIX);
}
- if (engine == null) {
- String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
- LOGGER.error(message);
- throw new ApexException(message);
- }
-
// Starts the event processing thread that handles incoming events
if (processorThread != null && processorThread.isAlive()) {
String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
@@ -387,12 +355,6 @@ final class EngineWorker implements EngineService {
+ engineWorkerKey.getId() + ENGINE_SUFFIX);
}
- if (engine == null) {
- String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
- LOGGER.error(message);
- throw new ApexException(message);
- }
-
// Interrupt the worker to stop its thread
if (processorThread == null || !processorThread.isAlive()) {
processorThread = null;
@@ -439,11 +401,6 @@ final class EngineWorker implements EngineService {
+ engineWorkerKey.getId() + ENGINE_SUFFIX);
}
- if (engine == null) {
- LOGGER.error(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
- throw new ApexException(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
- }
-
// Interrupt the worker to stop its thread
if (processorThread != null && !processorThread.isAlive()) {
LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state "
@@ -562,6 +519,7 @@ final class EngineWorker implements EngineService {
try {
final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
+ modelWriter.setJsonOutput(true);
modelWriter.write(apexEngineModel, baOutputStream);
return baOutputStream.toString();
} catch (final Exception e) {