summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java326
1 files changed, 218 insertions, 108 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
index a9b862d41..261ba41cf 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
@@ -151,20 +151,26 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
public void run() {
// Take messages off the queue and forward them to the Apex engine
while (messageListenerThread.isAlive() && !stopOrderedFlag) {
- try {
- final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
- if (data != null) {
- final List<Message> messages = data.getMessages();
- for (final Message message : messages) {
- handleMessage(message, data.getConnection());
- }
+ pollAndHandleMessage();
+ }
+ }
+
+ /**
+ * Poll the queue for a message and handle that message.
+ */
+ private void pollAndHandleMessage() {
+ try {
+ final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (data != null) {
+ final List<Message> messages = data.getMessages();
+ for (final Message message : messages) {
+ handleMessage(message, data.getConnection());
}
- } catch (final InterruptedException e) {
- // restore the interrupt status
- Thread.currentThread().interrupt();
- LOGGER.debug("message listener execution has been interrupted");
- break;
}
+ } catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
+ LOGGER.debug("message listener execution has been interrupted");
}
}
@@ -194,102 +200,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
+ "action on received message invalid, action must be of type \"EnDepAction\"");
}
- // Handle each incoming message using the inevitable switch statement for the EngDep
- // protocol
- switch (enDepAction) {
- case GET_ENGINE_SERVICE_INFO:
- final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
- LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
- + " . . .");
- // Send a reply with the engine service information
- sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
- apexService.getEngineKeys(), apexService.getApexModelKey());
- LOGGER.debug("returned engine service information for engine service "
- + apexService.getKey().getId());
- break;
-
- case UPDATE_MODEL:
- final UpdateModel updateModelMessage = (UpdateModel) message;
- LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
- // Update the model
- apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
- updateModelMessage.isForceInstall());
- // Send a reply indicating the message action worked
- sendReply(webSocket, updateModelMessage, true,
- "updated model in engine " + updateModelMessage.getTarget().getId());
- LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
- break;
-
- case START_ENGINE:
- final StartEngine startEngineMessage = (StartEngine) message;
- LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
- // Start the engine
- apexService.start(startEngineMessage.getTarget());
- // Send a reply indicating the message action worked
- sendReply(webSocket, startEngineMessage, true,
- "started engine " + startEngineMessage.getTarget().getId());
- LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
- break;
-
- case STOP_ENGINE:
- final StopEngine stopEngineMessage = (StopEngine) message;
- LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
- // Stop the engine
- apexService.stop(stopEngineMessage.getTarget());
- // Send a reply indicating the message action worked
- sendReply(webSocket, stopEngineMessage, true,
- "stopped engine " + stopEngineMessage.getTarget().getId());
- LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
- break;
-
- case START_PERIODIC_EVENTS:
- final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
- LOGGER.debug("starting periodic events on engine {} . . .",
- startPeriodicEventsMessage.getTarget().getId());
- // Start periodic events with the period specified in the message
- final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
- apexService.startPeriodicEvents(period);
- // Send a reply indicating the message action worked
- String periodicStartedMessage = "started periodic events on engine "
- + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
- sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
- LOGGER.debug(periodicStartedMessage);
- break;
-
- case STOP_PERIODIC_EVENTS:
- final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
- LOGGER.debug("stopping periodic events on engine {} . . .",
- stopPeriodicEventsMessage.getTarget().getId());
- // Stop periodic events
- apexService.stopPeriodicEvents();
- // Send a reply indicating the message action worked
- sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
- + stopPeriodicEventsMessage.getTarget().getId());
- LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
- break;
-
- case GET_ENGINE_STATUS:
- final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
- LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
- // Send a reply with the engine status
- sendReply(webSocket, getEngineStatusMessage, true,
- apexService.getStatus(getEngineStatusMessage.getTarget()));
- LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
- break;
-
- case GET_ENGINE_INFO:
- final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
- LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
- // Send a reply with the engine runtime information
- sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
- LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
- break;
- case RESPONSE:
- throw new ApexException("RESPONSE action on received message not handled by engine");
-
- default:
- break;
- }
+ handleIncomingMessages(message, webSocket, enDepAction);
} catch (final ApexException e) {
LOGGER.warn("apex failed to execute message", e);
sendReply(webSocket, message, false, e.getCascadedMessage());
@@ -301,6 +212,205 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable
}
/**
+ * Handle incoming EngDep messages.
+ *
+ * @param message the incoming message
+ * @param webSocket the web socket the message came in on
+ * @param enDepAction the action from the message
+ * @throws ApexException on message handling errors
+ */
+ private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction)
+ throws ApexException {
+ // Handle each incoming message using the inevitable switch statement for the EngDep
+ // protocol
+ switch (enDepAction) {
+ case GET_ENGINE_SERVICE_INFO:
+ handleGetEngineServiceInfoMessage(message, webSocket);
+ break;
+
+ case UPDATE_MODEL:
+ handleUpdateModelMessage(message, webSocket);
+ break;
+
+ case START_ENGINE:
+ handleStartEngineMessage(message, webSocket);
+ break;
+
+ case STOP_ENGINE:
+ handleStopEngineMessage(message, webSocket);
+ break;
+
+ case START_PERIODIC_EVENTS:
+ handleStartPeriodicEventsMessage(message, webSocket);
+ break;
+
+ case STOP_PERIODIC_EVENTS:
+ handleStopPeriodicEventsMessage(message, webSocket);
+ break;
+
+ case GET_ENGINE_STATUS:
+ handleEngineStatusMessage(message, webSocket);
+ break;
+
+ case GET_ENGINE_INFO:
+ handleEngineInfoMessage(message, webSocket);
+ break;
+
+ case RESPONSE:
+ throw new ApexException("RESPONSE action on received message not handled by engine");
+
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handle the get engine service information message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) {
+ final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
+ LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
+ + " . . .");
+ // Send a reply with the engine service information
+ sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
+ apexService.getEngineKeys(), apexService.getApexModelKey());
+ LOGGER.debug("returned engine service information for engine service "
+ + apexService.getKey().getId());
+ }
+
+ /**
+ * Handle the update model message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException {
+ final UpdateModel updateModelMessage = (UpdateModel) message;
+ LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
+ // Update the model
+ apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
+ updateModelMessage.isForceInstall());
+ // Send a reply indicating the message action worked
+ sendReply(webSocket, updateModelMessage, true,
+ "updated model in engine " + updateModelMessage.getTarget().getId());
+ LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
+ }
+
+ /**
+ * Handle the start engine message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
+ final StartEngine startEngineMessage = (StartEngine) message;
+ LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
+ // Start the engine
+ apexService.start(startEngineMessage.getTarget());
+ // Send a reply indicating the message action worked
+ sendReply(webSocket, startEngineMessage, true,
+ "started engine " + startEngineMessage.getTarget().getId());
+ LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
+ }
+
+ /**
+ * Handle the stop engine message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
+ final StopEngine stopEngineMessage = (StopEngine) message;
+ LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
+ // Stop the engine
+ apexService.stop(stopEngineMessage.getTarget());
+ // Send a reply indicating the message action worked
+ sendReply(webSocket, stopEngineMessage, true,
+ "stopped engine " + stopEngineMessage.getTarget().getId());
+ LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
+ }
+
+ /**
+ * Handle the start periodic events message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket)
+ throws ApexException {
+ final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
+ LOGGER.debug("starting periodic events on engine {} . . .",
+ startPeriodicEventsMessage.getTarget().getId());
+ // Start periodic events with the period specified in the message
+ final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
+ apexService.startPeriodicEvents(period);
+ // Send a reply indicating the message action worked
+ String periodicStartedMessage = "started periodic events on engine "
+ + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
+ sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
+ LOGGER.debug(periodicStartedMessage);
+ }
+
+ /**
+ * Handle the stop periodic events message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket)
+ throws ApexException {
+ final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
+ LOGGER.debug("stopping periodic events on engine {} . . .",
+ stopPeriodicEventsMessage.getTarget().getId());
+ // Stop periodic events
+ apexService.stopPeriodicEvents();
+ // Send a reply indicating the message action worked
+ sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
+ + stopPeriodicEventsMessage.getTarget().getId());
+ LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
+ }
+
+ /**
+ * Handle the engine status message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException {
+ final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
+ LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
+ // Send a reply with the engine status
+ sendReply(webSocket, getEngineStatusMessage, true,
+ apexService.getStatus(getEngineStatusMessage.getTarget()));
+ LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
+ }
+
+ /**
+ * Handle the engine information message.
+ *
+ * @param message the message
+ * @param webSocket the web socket that the message came on
+ * @throws ApexException on message handling exceptions
+ */
+ private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException {
+ final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
+ LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
+ // Send a reply with the engine runtime information
+ sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
+ LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
+ }
+
+ /**
* Send the Response message to the client.
*
* @param client the client to which to send the response message