/*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.policy.apex.service.engine.engdep; import com.google.common.eventbus.Subscribe; import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.java_websocket.WebSocket; import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.core.protocols.Message; import org.onap.policy.apex.core.protocols.engdep.EngDepAction; import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse; import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo; import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo; import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus; import org.onap.policy.apex.core.protocols.engdep.messages.Response; import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine; import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents; import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine; import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents; import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.service.engine.runtime.EngineService; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage * event implements this interface, and the object created with that class is registered with a component using the * component's addEngDepMessageListener method. When the engDepMessage event occurs, that object's * appropriate method is invoked. * *

This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) */ public class EngDepMessageListener implements MessageListener, Runnable { private static final int LISTENER_STOP_WAIT_INTERVAL = 10; private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class); // The timeout to wait between queue poll timeouts in milliseconds private static final long QUEUE_POLL_TIMEOUT = 50; // The Apex service itself private final EngineService apexService; // The message listener thread and stopping flag private Thread messageListenerThread; private boolean stopOrderedFlag = false; // The message queue is used to hold messages prior to forwarding to Apex private final BlockingQueue> messageQueue = new LinkedBlockingDeque<>(); /** * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The * apexService is the Apex service to send the messages onto. * * @param apexService the Apex engine service */ protected EngDepMessageListener(final EngineService apexService) { this.apexService = apexService; } /** * This method is an implementation of the message listener. It receives a message and places it on the queue for * processing by the message listening thread. * * @param data the data * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage * (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data) */ @Subscribe @Override public void onMessage(final MessageBlock data) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("message received from client application {} port {}", data.getConnection().getRemoteSocketAddress().getAddress(), data.getConnection().getRemoteSocketAddress().getPort()); } messageQueue.add(data); } /* * (non-Javadoc) * * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String) */ @Override public void onMessage(final String messageString) { throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol"); } /** * This method gets a new message listening thread from the thread factory and starts it. */ public void startProcessorThread() { LOGGER.entry(); messageListenerThread = new Thread(this); messageListenerThread.setDaemon(true); messageListenerThread.start(); LOGGER.exit(); } /** * Stops the message listening threads. */ public void stopProcessorThreads() { LOGGER.entry(); stopOrderedFlag = true; while (messageListenerThread.isAlive()) { ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL); } LOGGER.exit(); } /** * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one */ @Override public void run() { // Take messages off the queue and forward them to the Apex engine while (messageListenerThread.isAlive() && !stopOrderedFlag) { try { final MessageBlock data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS); if (data != null) { final List messages = data.getMessages(); for (final Message message : messages) { handleMessage(message, data.getConnection()); } } } catch (final InterruptedException e) { // restore the interrupt status Thread.currentThread().interrupt(); LOGGER.debug("message listener execution has been interrupted"); break; } } } /** * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the * messages. * * @param message the incoming EngDep message * @param webSocket the web socket on which the message came in */ private void handleMessage(final Message message, final WebSocket webSocket) { LOGGER.entry(webSocket.getRemoteSocketAddress().toString()); if (message.getAction() == null) { // This is a response message return; } try { LOGGER.debug("Manager action {} being applied to engine", message.getAction()); // Get and check the incoming action for validity EngDepAction enDepAction = null; if (message.getAction() instanceof EngDepAction) { enDepAction = (EngDepAction) message.getAction(); } else { throw new ApexException(message.getAction().getClass().getName() + "action on received message invalid, action must be of type \"EnDepAction\""); } // Handle each incoming message using the inevitable switch statement for the EngDep // protocol switch (enDepAction) { case GET_ENGINE_SERVICE_INFO: final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message; LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId() + " . . ."); // Send a reply with the engine service information sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(), apexService.getEngineKeys(), apexService.getApexModelKey()); LOGGER.debug("returned engine service information for engine service " + apexService.getKey().getId()); break; case UPDATE_MODEL: final UpdateModel updateModelMessage = (UpdateModel) message; LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId()); // Update the model apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(), updateModelMessage.isForceInstall()); // Send a reply indicating the message action worked sendReply(webSocket, updateModelMessage, true, "updated model in engine " + updateModelMessage.getTarget().getId()); LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId()); break; case START_ENGINE: final StartEngine startEngineMessage = (StartEngine) message; LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId()); // Start the engine apexService.start(startEngineMessage.getTarget()); // Send a reply indicating the message action worked sendReply(webSocket, startEngineMessage, true, "started engine " + startEngineMessage.getTarget().getId()); LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId()); break; case STOP_ENGINE: final StopEngine stopEngineMessage = (StopEngine) message; LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId()); // Stop the engine apexService.stop(stopEngineMessage.getTarget()); // Send a reply indicating the message action worked sendReply(webSocket, stopEngineMessage, true, "stopped engine " + stopEngineMessage.getTarget().getId()); LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId()); break; case START_PERIODIC_EVENTS: final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message; LOGGER.debug("starting periodic events on engine {} . . .", startPeriodicEventsMessage.getTarget().getId()); // Start periodic events with the period specified in the message final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData()); apexService.startPeriodicEvents(period); // Send a reply indicating the message action worked String periodicStartedMessage = "started periodic events on engine " + startPeriodicEventsMessage.getTarget().getId() + " with period " + period; sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage); LOGGER.debug(periodicStartedMessage); break; case STOP_PERIODIC_EVENTS: final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message; LOGGER.debug("stopping periodic events on engine {} . . .", stopPeriodicEventsMessage.getTarget().getId()); // Stop periodic events apexService.stopPeriodicEvents(); // Send a reply indicating the message action worked sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId()); LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId()); break; case GET_ENGINE_STATUS: final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message; LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId()); // Send a reply with the engine status sendReply(webSocket, getEngineStatusMessage, true, apexService.getStatus(getEngineStatusMessage.getTarget())); LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId()); break; case GET_ENGINE_INFO: final GetEngineInfo getEngineInfo = (GetEngineInfo) message; LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId()); // Send a reply with the engine runtime information sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget())); LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId()); break; case RESPONSE: throw new ApexException("RESPONSE action on received message not handled by engine"); default: break; } } catch (final ApexException e) { LOGGER.warn("apex failed to execute message", e); sendReply(webSocket, message, false, e.getCascadedMessage()); } catch (final Exception e) { LOGGER.warn("system failure executing message", e); sendReply(webSocket, message, false, e.getMessage()); } LOGGER.exit(); } /** * Send the Response message to the client. * * @param client the client to which to send the response message * @param requestMessage the message to which we are responding * @param result the result indicating success or failure * @param messageData the message data */ private void sendReply(final WebSocket client, final Message requestMessage, final boolean result, final String messageData) { LOGGER.entry(result, messageData); if (client == null || !client.isOpen()) { LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction()); return; } String replyString = "sending " + requestMessage.getAction() + " to web socket " + client.getRemoteSocketAddress().toString(); LOGGER.debug(replyString); final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage); responseMessage.setMessageData(messageData); final MessageHolder messageHolder = new MessageHolder<>(MessagingUtils.getHost()); messageHolder.addMessage(responseMessage); client.send(MessagingUtils.serializeObject(messageHolder)); LOGGER.exit(); } /** * Send the EngineServiceInfoResponse message to the client. * * @param client the client to which to send the response message * @param requestMessage the message to which we are responding * @param engineServiceKey The key of this engine service * @param engineKeyCollection The keys of the engines in this engine service * @param apexModelKey the apex model key */ private void sendServiceInfoReply(final WebSocket client, final Message requestMessage, final AxArtifactKey engineServiceKey, final Collection engineKeyCollection, final AxArtifactKey apexModelKey) { LOGGER.entry(); String sendingMessage = "sending " + requestMessage.getAction() + " to web socket " + client.getRemoteSocketAddress().toString(); LOGGER.debug(sendingMessage); final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(), true, requestMessage); responseMessage.setMessageData("engine service information"); responseMessage.setEngineServiceKey(engineServiceKey); responseMessage.setEngineKeyArray(engineKeyCollection); responseMessage.setApexModelKey(apexModelKey); final MessageHolder messageHolder = new MessageHolder<>(MessagingUtils.getHost()); messageHolder.addMessage(responseMessage); client.send(MessagingUtils.serializeObject(messageHolder)); LOGGER.exit(); } }