diff options
Diffstat (limited to 'services/services-engine/src/main')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java | 326 |
1 files changed, 218 insertions, 108 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java index a9b862d41..261ba41cf 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java @@ -151,20 +151,26 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable public void run() { // Take messages off the queue and forward them to the Apex engine while (messageListenerThread.isAlive() && !stopOrderedFlag) { - try { - final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS); - if (data != null) { - final List<Message> messages = data.getMessages(); - for (final Message message : messages) { - handleMessage(message, data.getConnection()); - } + pollAndHandleMessage(); + } + } + + /** + * Poll the queue for a message and handle that message. + */ + private void pollAndHandleMessage() { + try { + final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS); + if (data != null) { + final List<Message> messages = data.getMessages(); + for (final Message message : messages) { + handleMessage(message, data.getConnection()); } - } catch (final InterruptedException e) { - // restore the interrupt status - Thread.currentThread().interrupt(); - LOGGER.debug("message listener execution has been interrupted"); - break; } + } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); + LOGGER.debug("message listener execution has been interrupted"); } } @@ -194,102 +200,7 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable + "action on received message invalid, action must be of type \"EnDepAction\""); } - // Handle each incoming message using the inevitable switch statement for the EngDep - // protocol - switch (enDepAction) { - case GET_ENGINE_SERVICE_INFO: - final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message; - LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId() - + " . . ."); - // Send a reply with the engine service information - sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(), - apexService.getEngineKeys(), apexService.getApexModelKey()); - LOGGER.debug("returned engine service information for engine service " - + apexService.getKey().getId()); - break; - - case UPDATE_MODEL: - final UpdateModel updateModelMessage = (UpdateModel) message; - LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId()); - // Update the model - apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(), - updateModelMessage.isForceInstall()); - // Send a reply indicating the message action worked - sendReply(webSocket, updateModelMessage, true, - "updated model in engine " + updateModelMessage.getTarget().getId()); - LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId()); - break; - - case START_ENGINE: - final StartEngine startEngineMessage = (StartEngine) message; - LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId()); - // Start the engine - apexService.start(startEngineMessage.getTarget()); - // Send a reply indicating the message action worked - sendReply(webSocket, startEngineMessage, true, - "started engine " + startEngineMessage.getTarget().getId()); - LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId()); - break; - - case STOP_ENGINE: - final StopEngine stopEngineMessage = (StopEngine) message; - LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId()); - // Stop the engine - apexService.stop(stopEngineMessage.getTarget()); - // Send a reply indicating the message action worked - sendReply(webSocket, stopEngineMessage, true, - "stopped engine " + stopEngineMessage.getTarget().getId()); - LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId()); - break; - - case START_PERIODIC_EVENTS: - final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message; - LOGGER.debug("starting periodic events on engine {} . . .", - startPeriodicEventsMessage.getTarget().getId()); - // Start periodic events with the period specified in the message - final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData()); - apexService.startPeriodicEvents(period); - // Send a reply indicating the message action worked - String periodicStartedMessage = "started periodic events on engine " - + startPeriodicEventsMessage.getTarget().getId() + " with period " + period; - sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage); - LOGGER.debug(periodicStartedMessage); - break; - - case STOP_PERIODIC_EVENTS: - final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message; - LOGGER.debug("stopping periodic events on engine {} . . .", - stopPeriodicEventsMessage.getTarget().getId()); - // Stop periodic events - apexService.stopPeriodicEvents(); - // Send a reply indicating the message action worked - sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine " - + stopPeriodicEventsMessage.getTarget().getId()); - LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId()); - break; - - case GET_ENGINE_STATUS: - final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message; - LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId()); - // Send a reply with the engine status - sendReply(webSocket, getEngineStatusMessage, true, - apexService.getStatus(getEngineStatusMessage.getTarget())); - LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId()); - break; - - case GET_ENGINE_INFO: - final GetEngineInfo getEngineInfo = (GetEngineInfo) message; - LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId()); - // Send a reply with the engine runtime information - sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget())); - LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId()); - break; - case RESPONSE: - throw new ApexException("RESPONSE action on received message not handled by engine"); - - default: - break; - } + handleIncomingMessages(message, webSocket, enDepAction); } catch (final ApexException e) { LOGGER.warn("apex failed to execute message", e); sendReply(webSocket, message, false, e.getCascadedMessage()); @@ -301,6 +212,205 @@ public class EngDepMessageListener implements MessageListener<Message>, Runnable } /** + * Handle incoming EngDep messages. + * + * @param message the incoming message + * @param webSocket the web socket the message came in on + * @param enDepAction the action from the message + * @throws ApexException on message handling errors + */ + private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction) + throws ApexException { + // Handle each incoming message using the inevitable switch statement for the EngDep + // protocol + switch (enDepAction) { + case GET_ENGINE_SERVICE_INFO: + handleGetEngineServiceInfoMessage(message, webSocket); + break; + + case UPDATE_MODEL: + handleUpdateModelMessage(message, webSocket); + break; + + case START_ENGINE: + handleStartEngineMessage(message, webSocket); + break; + + case STOP_ENGINE: + handleStopEngineMessage(message, webSocket); + break; + + case START_PERIODIC_EVENTS: + handleStartPeriodicEventsMessage(message, webSocket); + break; + + case STOP_PERIODIC_EVENTS: + handleStopPeriodicEventsMessage(message, webSocket); + break; + + case GET_ENGINE_STATUS: + handleEngineStatusMessage(message, webSocket); + break; + + case GET_ENGINE_INFO: + handleEngineInfoMessage(message, webSocket); + break; + + case RESPONSE: + throw new ApexException("RESPONSE action on received message not handled by engine"); + + default: + break; + } + } + + /** + * Handle the get engine service information message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) { + final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message; + LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId() + + " . . ."); + // Send a reply with the engine service information + sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(), + apexService.getEngineKeys(), apexService.getApexModelKey()); + LOGGER.debug("returned engine service information for engine service " + + apexService.getKey().getId()); + } + + /** + * Handle the update model message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException { + final UpdateModel updateModelMessage = (UpdateModel) message; + LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId()); + // Update the model + apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(), + updateModelMessage.isForceInstall()); + // Send a reply indicating the message action worked + sendReply(webSocket, updateModelMessage, true, + "updated model in engine " + updateModelMessage.getTarget().getId()); + LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId()); + } + + /** + * Handle the start engine message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException { + final StartEngine startEngineMessage = (StartEngine) message; + LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId()); + // Start the engine + apexService.start(startEngineMessage.getTarget()); + // Send a reply indicating the message action worked + sendReply(webSocket, startEngineMessage, true, + "started engine " + startEngineMessage.getTarget().getId()); + LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId()); + } + + /** + * Handle the stop engine message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException { + final StopEngine stopEngineMessage = (StopEngine) message; + LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId()); + // Stop the engine + apexService.stop(stopEngineMessage.getTarget()); + // Send a reply indicating the message action worked + sendReply(webSocket, stopEngineMessage, true, + "stopped engine " + stopEngineMessage.getTarget().getId()); + LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId()); + } + + /** + * Handle the start periodic events message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket) + throws ApexException { + final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message; + LOGGER.debug("starting periodic events on engine {} . . .", + startPeriodicEventsMessage.getTarget().getId()); + // Start periodic events with the period specified in the message + final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData()); + apexService.startPeriodicEvents(period); + // Send a reply indicating the message action worked + String periodicStartedMessage = "started periodic events on engine " + + startPeriodicEventsMessage.getTarget().getId() + " with period " + period; + sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage); + LOGGER.debug(periodicStartedMessage); + } + + /** + * Handle the stop periodic events message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket) + throws ApexException { + final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message; + LOGGER.debug("stopping periodic events on engine {} . . .", + stopPeriodicEventsMessage.getTarget().getId()); + // Stop periodic events + apexService.stopPeriodicEvents(); + // Send a reply indicating the message action worked + sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine " + + stopPeriodicEventsMessage.getTarget().getId()); + LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId()); + } + + /** + * Handle the engine status message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException { + final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message; + LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId()); + // Send a reply with the engine status + sendReply(webSocket, getEngineStatusMessage, true, + apexService.getStatus(getEngineStatusMessage.getTarget())); + LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId()); + } + + /** + * Handle the engine information message. + * + * @param message the message + * @param webSocket the web socket that the message came on + * @throws ApexException on message handling exceptions + */ + private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException { + final GetEngineInfo getEngineInfo = (GetEngineInfo) message; + LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId()); + // Send a reply with the engine runtime information + sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget())); + LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId()); + } + + /** * Send the Response message to the client. * * @param client the client to which to send the response message |