summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main/java')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java482
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java113
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/package-info.java26
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java56
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEngineServiceHandler.java29
5 files changed, 39 insertions, 667 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
deleted file mode 100644
index 113d71cfe..000000000
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessageListener.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.apex.service.engine.engdep;
-
-import com.google.common.eventbus.Subscribe;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import org.java_websocket.WebSocket;
-import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
-import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
-import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
-import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.core.protocols.Message;
-import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
-import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
-import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
-import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
-import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
-import org.onap.policy.apex.core.protocols.engdep.messages.Response;
-import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
-import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
-import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
-import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
-import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
-import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
-import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
-import org.onap.policy.apex.service.engine.runtime.EngineService;
-import org.slf4j.ext.XLogger;
-import org.slf4j.ext.XLoggerFactory;
-
-/**
- * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage
- * event implements this interface, and the object created with that class is registered with a component using the
- * component's <code>addEngDepMessageListener</code> method. When the engDepMessage event occurs, that object's
- * appropriate method is invoked.
- *
- * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message
- * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine.
- *
- * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- */
-public class EngDepMessageListener implements MessageListener<Message>, Runnable {
- private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
-
- private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
-
- // The timeout to wait between queue poll timeouts in milliseconds
- private static final long QUEUE_POLL_TIMEOUT = 50;
-
- // The Apex service itself
- private final EngineService apexService;
-
- // The message listener thread and stopping flag
- private Thread messageListenerThread;
- private volatile boolean stopOrderedFlag = false;
-
- // The message queue is used to hold messages prior to forwarding to Apex
- private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
-
- /**
- * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The
- * <code>apexService</code> is the Apex service to send the messages onto.
- *
- * @param apexService the Apex engine service
- */
- protected EngDepMessageListener(final EngineService apexService) {
- this.apexService = apexService;
- }
-
- /**
- * This method is an implementation of the message listener. It receives a message and places it on the queue for
- * processing by the message listening thread.
- *
- * @param data the data
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
- * (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
- */
- @Subscribe
- @Override
- public void onMessage(final MessageBlock<Message> data) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("message received from client application {} port {}",
- data.getConnection().getRemoteSocketAddress().getAddress(),
- data.getConnection().getRemoteSocketAddress().getPort());
- }
- messageQueue.add(data);
- }
-
- /**
- * {@inheritDoc}.
- */
- @Override
- public void onMessage(final String messageString) {
- throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
- }
-
- /**
- * This method gets a new message listening thread from the thread factory and starts it.
- */
- public void startProcessorThread() {
- LOGGER.entry();
- messageListenerThread = new Thread(this);
- messageListenerThread.setDaemon(true);
- messageListenerThread.start();
- LOGGER.exit();
- }
-
- /**
- * Stops the message listening threads.
- */
- public void stopProcessorThreads() {
- LOGGER.entry();
- stopOrderedFlag = true;
-
- while (messageListenerThread.isAlive()) {
- ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
- }
- LOGGER.exit();
- }
-
- /**
- * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one
- */
- @Override
- public void run() {
- // Take messages off the queue and forward them to the Apex engine
- while (!stopOrderedFlag) {
- pollAndHandleMessage();
- }
- }
-
- /**
- * Poll the queue for a message and handle that message.
- */
- private void pollAndHandleMessage() {
- try {
- final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
- if (data != null) {
- final List<Message> messages = data.getMessages();
- for (final Message message : messages) {
- handleMessage(message, data.getConnection());
- }
- }
- } catch (final InterruptedException e) {
- // restore the interrupt status
- Thread.currentThread().interrupt();
- LOGGER.debug("message listener execution has been interrupted");
- }
- }
-
- /**
- * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the
- * messages.
- *
- * @param message the incoming EngDep message
- * @param webSocket the web socket on which the message came in
- */
- private void handleMessage(final Message message, final WebSocket webSocket) {
- LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
- if (message.getAction() == null) {
- // This is a response message
- return;
- }
-
- try {
- LOGGER.debug("Manager action {} being applied to engine", message.getAction());
-
- // Get and check the incoming action for validity
- EngDepAction enDepAction = null;
- if (message.getAction() instanceof EngDepAction) {
- enDepAction = (EngDepAction) message.getAction();
- } else {
- throw new ApexException(message.getAction().getClass().getName()
- + "action on received message invalid, action must be of type \"EnDepAction\"");
- }
-
- handleIncomingMessages(message, webSocket, enDepAction);
- } catch (final ApexException e) {
- LOGGER.warn("apex failed to execute message", e);
- sendReply(webSocket, message, false, e.getCascadedMessage());
- } catch (final Exception e) {
- LOGGER.warn("system failure executing message", e);
- sendReply(webSocket, message, false, e.getMessage());
- }
- LOGGER.exit();
- }
-
- /**
- * Handle incoming EngDep messages.
- *
- * @param message the incoming message
- * @param webSocket the web socket the message came in on
- * @param enDepAction the action from the message
- * @throws ApexException on message handling errors
- */
- private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction)
- throws ApexException {
- // Handle each incoming message using the inevitable switch statement for the EngDep
- // protocol
- switch (enDepAction) {
- case GET_ENGINE_SERVICE_INFO:
- handleGetEngineServiceInfoMessage(message, webSocket);
- break;
-
- case UPDATE_MODEL:
- handleUpdateModelMessage(message, webSocket);
- break;
-
- case START_ENGINE:
- handleStartEngineMessage(message, webSocket);
- break;
-
- case STOP_ENGINE:
- handleStopEngineMessage(message, webSocket);
- break;
-
- case START_PERIODIC_EVENTS:
- handleStartPeriodicEventsMessage(message, webSocket);
- break;
-
- case STOP_PERIODIC_EVENTS:
- handleStopPeriodicEventsMessage(message, webSocket);
- break;
-
- case GET_ENGINE_STATUS:
- handleEngineStatusMessage(message, webSocket);
- break;
-
- case GET_ENGINE_INFO:
- handleEngineInfoMessage(message, webSocket);
- break;
-
- default:
- throw new ApexException("action " + enDepAction + " on received message not handled by engine");
- }
- }
-
- /**
- * Handle the get engine service information message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) {
- final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
- LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
- + " . . .");
- // Send a reply with the engine service information
- sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
- apexService.getEngineKeys(), apexService.getApexModelKey());
- LOGGER.debug("returned engine service information for engine service "
- + apexService.getKey().getId());
- }
-
- /**
- * Handle the update model message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException {
- final UpdateModel updateModelMessage = (UpdateModel) message;
- LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
- // Update the model
- apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
- updateModelMessage.isForceInstall());
- // Send a reply indicating the message action worked
- sendReply(webSocket, updateModelMessage, true,
- "updated model in engine " + updateModelMessage.getTarget().getId());
- LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
- }
-
- /**
- * Handle the start engine message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
- final StartEngine startEngineMessage = (StartEngine) message;
- LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
- // Start the engine
- apexService.start(startEngineMessage.getTarget());
- // Send a reply indicating the message action worked
- sendReply(webSocket, startEngineMessage, true,
- "started engine " + startEngineMessage.getTarget().getId());
- LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
- }
-
- /**
- * Handle the stop engine message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
- final StopEngine stopEngineMessage = (StopEngine) message;
- LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
- // Stop the engine
- apexService.stop(stopEngineMessage.getTarget());
- // Send a reply indicating the message action worked
- sendReply(webSocket, stopEngineMessage, true,
- "stopped engine " + stopEngineMessage.getTarget().getId());
- LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
- }
-
- /**
- * Handle the start periodic events message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket)
- throws ApexException {
- final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
- LOGGER.debug("starting periodic events on engine {} . . .",
- startPeriodicEventsMessage.getTarget().getId());
- // Start periodic events with the period specified in the message
- final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
- apexService.startPeriodicEvents(period);
- // Send a reply indicating the message action worked
- String periodicStartedMessage = "started periodic events on engine "
- + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
- sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
- LOGGER.debug(periodicStartedMessage);
- }
-
- /**
- * Handle the stop periodic events message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket)
- throws ApexException {
- final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
- LOGGER.debug("stopping periodic events on engine {} . . .",
- stopPeriodicEventsMessage.getTarget().getId());
- // Stop periodic events
- apexService.stopPeriodicEvents();
- // Send a reply indicating the message action worked
- sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
- + stopPeriodicEventsMessage.getTarget().getId());
- LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
- }
-
- /**
- * Handle the engine status message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException {
- final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
- LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
- // Send a reply with the engine status
- sendReply(webSocket, getEngineStatusMessage, true,
- apexService.getStatus(getEngineStatusMessage.getTarget()));
- LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
- }
-
- /**
- * Handle the engine information message.
- *
- * @param message the message
- * @param webSocket the web socket that the message came on
- * @throws ApexException on message handling exceptions
- */
- private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException {
- final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
- LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
- // Send a reply with the engine runtime information
- sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
- LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
- }
-
- /**
- * Get the local address for the WS MessageHolder, or null if there is a problem.
- */
- private InetAddress getLocalAddress() {
- try {
- return MessagingUtils.getLocalHostLanAddress();
- } catch (UnknownHostException e) {
- LOGGER.debug("failed to find the localhost address - continuing ...", e);
- return null;
- }
- }
-
- /**
- * Send the Response message to the client.
- *
- * @param client the client to which to send the response message
- * @param requestMessage the message to which we are responding
- * @param result the result indicating success or failure
- * @param messageData the message data
- */
- private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
- final String messageData) {
- LOGGER.entry(result, messageData);
-
- if (client == null || !client.isOpen()) {
- LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
- return;
- }
-
- String replyString = "sending " + requestMessage.getAction() + " to web socket "
- + client.getRemoteSocketAddress().toString();
- LOGGER.debug(replyString);
-
- final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
- responseMessage.setMessageData(messageData);
-
- final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
- messageHolder.addMessage(responseMessage);
- client.send(MessagingUtils.serializeObject(messageHolder));
-
- LOGGER.exit();
- }
-
- /**
- * Send the EngineServiceInfoResponse message to the client.
- *
- * @param client the client to which to send the response message
- * @param requestMessage the message to which we are responding
- * @param engineServiceKey The key of this engine service
- * @param engineKeyCollection The keys of the engines in this engine service
- * @param apexModelKey the apex model key
- */
- private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
- final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
- final AxArtifactKey apexModelKey) {
- LOGGER.entry();
- String sendingMessage = "sending " + requestMessage.getAction() + " to web socket "
- + client.getRemoteSocketAddress().toString();
- LOGGER.debug(sendingMessage);
-
- final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(),
- true, requestMessage);
- responseMessage.setMessageData("engine service information");
- responseMessage.setEngineServiceKey(engineServiceKey);
- responseMessage.setEngineKeyArray(engineKeyCollection);
- responseMessage.setApexModelKey(apexModelKey);
-
- final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
- messageHolder.addMessage(responseMessage);
- client.send(MessagingUtils.serializeObject(messageHolder));
-
- LOGGER.exit();
- }
-}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java
deleted file mode 100644
index 91c1fb13c..000000000
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/EngDepMessagingService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.apex.service.engine.engdep;
-
-import java.net.InetSocketAddress;
-import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
-import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
-import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
-import org.onap.policy.apex.core.protocols.Message;
-import org.onap.policy.apex.service.engine.runtime.EngineService;
-import org.slf4j.ext.XLogger;
-import org.slf4j.ext.XLoggerFactory;
-
-/**
- * The Class EngDepMessagingService is used to encapsulate the server side of EngDep communication. This class allows
- * users to create and start an EngDep server.
- *
- * @author Liam Fallon (liam.fallon@ericsson.com)
- */
-public class EngDepMessagingService {
- private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessagingService.class);
-
- private final MessagingService<Message> messageService;
-
- // The listener that is listening for messages coming in on the EngDep protocol from clients
- private final EngDepMessageListener messageListener;
-
- /**
- * Instantiates a new EngDep messaging service. It creates the message service instance, a listener for incoming
- * messages, and starts the message listener thread for handling incoming messages.
- *
- * @param service the Apex engine service that this EngDep service is running for
- * @param port the port The port to use for EngDep communication
- */
- public EngDepMessagingService(final EngineService service, final int port) {
- LOGGER.entry(service);
-
- // Create the service and listener and add the listener.
- messageService = getMessageService(port);
- messageListener = new EngDepMessageListener(service);
- messageService.addMessageListener(messageListener);
-
- // Start incoming message processing on the listener
- messageListener.startProcessorThread();
- LOGGER.exit();
- }
-
- /**
- * Start the server, open the communication mechanism for connections.
- */
- public void start() {
- LOGGER.info("engine<-->deployment messaging starting . . .");
- messageService.startConnection();
- LOGGER.info("engine<-->deployment messaging started");
- }
-
- /**
- * Start the server, close the communication mechanism.
- */
- public void stop() {
- LOGGER.info("engine<-->deployment messaging stopping . . .");
- messageService.stopConnection();
- messageListener.stopProcessorThreads();
- LOGGER.info("engine<-->deployment messaging stopped");
- }
-
- /**
- * Is the server started?.
- *
- * @return true, if checks if is started
- */
- public boolean isStarted() {
- return messageService.isStarted();
- }
-
- /**
- * Is the server stopped?.
- *
- * @return true, if checks if is stopped
- */
- public boolean isStopped() {
- return !messageService.isStarted();
- }
-
- /**
- * Get a message service instance. This method is protected so that it can be intercepted in unit test.
- * @param port the message service port
- * @return the message service
- */
- protected MessagingService<Message> getMessageService(final int port) {
- // Messaging service is used to transmit and receive messages over a communication protocol
- MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
- return factory.createServer(new InetSocketAddress(MessagingUtils.checkPort(port)));
- }
-}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/package-info.java
deleted file mode 100644
index 41f644465..000000000
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/engdep/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-/**
- * Gives access to the APEX EngDep protocol for APEX engine management at runtime over a Java API.
- *
- * @author Liam Fallon (liam.fallon@ericsson.com)
- */
-package org.onap.policy.apex.service.engine.engdep;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index a928f208c..233fa1aa0 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -43,7 +43,6 @@ import org.onap.policy.apex.model.basicmodel.service.ModelService;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
-import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.runtime.EngineService;
import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
@@ -58,8 +57,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This class wraps an Apex engine so that it can be activated as a complete service together with all its context,
- * executor, and event plugins.
+ * This class wraps an Apex engine so that it can be activated as a complete
+ * service together with all its context, executor, and event plugins.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -83,7 +82,8 @@ public class ApexActivator {
// Event marshalers are used to send events asynchronously from Apex
private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
- // The engine service handler holds the references to the engine and its EngDep deployment
+ // The engine service handler holds the references to the engine and its EngDep
+ // deployment
// interface. It also acts as a receiver for asynchronous
// and synchronous events from the engine.
private ApexEngineServiceHandler engineServiceHandler = null;
@@ -130,8 +130,8 @@ public class ApexActivator {
policyModelsMap = new LinkedHashMap<>();
Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
- Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet =
- new LinkedHashSet<>(apexParametersMap.entrySet());
+ Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>(
+ apexParametersMap.entrySet());
apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
ApexParameters apexParams = apexParamsEntry.getValue();
List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet());
@@ -180,11 +180,12 @@ public class ApexActivator {
}
private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
- // Doing a deep copy so that original values in policyModelsMap is retained after reduction operation
+ // Doing a deep copy so that original values in policyModelsMap is retained
+ // after reduction operation
Set<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
- Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry =
- policyModelsEntries.stream().reduce((entry1, entry2) -> {
+ Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream()
+ .reduce((entry1, entry2) -> {
try {
entry1.setValue(
PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
@@ -207,7 +208,8 @@ public class ApexActivator {
Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
throws ApexEventException {
- // Producer parameters specify what event marshalers to handle events leaving Apex are
+ // Producer parameters specify what event marshalers to handle events leaving
+ // Apex are
// set up and how they are set up
for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
@@ -217,7 +219,8 @@ public class ApexActivator {
marshallerMap.put(outputParameters.getKey(), marshaller);
}
- // Consumer parameters specify what event unmarshalers to handle events coming into Apex
+ // Consumer parameters specify what event unmarshalers to handle events coming
+ // into Apex
// are set up and how they are set up
for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
@@ -229,7 +232,8 @@ public class ApexActivator {
private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
Map<String, EventHandlerParameters> outputParametersMap) {
- // stop and remove any marshaller/unmarshaller that is part of a policy that is undeployed
+ // stop and remove any marshaller/unmarshaller that is part of a policy that is
+ // undeployed
marshallerMap.entrySet().stream()
.filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
.forEach(marshallerEntry -> marshallerEntry.getValue().stop());
@@ -239,7 +243,8 @@ public class ApexActivator {
.forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
- // If a marshaller/unmarshaller is already initialized, they don't need to be reinitialized during model update.
+ // If a marshaller/unmarshaller is already initialized, they don't need to be
+ // reinitialized during model update.
outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
}
@@ -252,20 +257,16 @@ public class ApexActivator {
LOGGER.debug("starting apex engine service . . .");
apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
- // Instantiate and start the messaging service for Deployment
- LOGGER.debug("starting apex deployment service . . .");
- final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
- apexParameters.getEngineServiceParameters().getDeploymentPort());
- engDepService.start();
-
- // Create the engine holder to hold the engine's references and act as an event receiver
- engineServiceHandler = new ApexEngineServiceHandler(apexEngineService, engDepService);
+ // Create the engine holder to hold the engine's references and act as an event
+ // receiver
+ engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
}
/**
- * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
- * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
- * we'll also find its paired marshaler
+ * Set up unmarshaler/marshaler pairing for synchronized event handling. We only
+ * need to traverse the unmarshalers because the unmarshalers and marshalers are
+ * paired one to one uniquely so if we find a synchronized unmarshaler we'll
+ * also find its paired marshaler
*
* @param inputParametersMap the apex parameters
*/
@@ -278,8 +279,8 @@ public class ApexActivator {
// Check if the unmarshaler is synchronized with a marshaler
if (inputParameters.getValue().isPeeredMode(peeredMode)) {
// Find the unmarshaler and marshaler
- final ApexEventMarshaller peeredMarshaler =
- marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
+ final ApexEventMarshaller peeredMarshaler = marshallerMap
+ .get(inputParameters.getValue().getPeer(peeredMode));
// Connect the unmarshaler and marshaler
unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
@@ -289,7 +290,8 @@ public class ApexActivator {
}
/**
- * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
+ * Start up event processing, this happens once all marshaller to unmarshaller
+ * wiring has been done.
*
* @param inputParametersMap the apex parameters
*/
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEngineServiceHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEngineServiceHandler.java
index 516ea4f2a..f278fd59d 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEngineServiceHandler.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEngineServiceHandler.java
@@ -1,19 +1,20 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * Modifications Copyright (C) 2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -21,16 +22,15 @@
package org.onap.policy.apex.service.engine.main;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
-import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
import org.onap.policy.apex.service.engine.event.ApexEvent;
import org.onap.policy.apex.service.engine.runtime.EngineService;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class ApexEngineServiceHandler holds the reference to the Apex engine service and the EngDep
- * service for that engine. It also acts as an event receiver for asynchronous and synchronous
- * events.
+ * The Class ApexEngineServiceHandler holds the reference to the Apex engine
+ * service and the EngDep service for that engine. It also acts as an event
+ * receiver for asynchronous and synchronous events.
*/
public class ApexEngineServiceHandler {
// The logger for this class
@@ -39,23 +39,19 @@ public class ApexEngineServiceHandler {
// The Apex engine service, the Apex engine itself
private final EngineService apexEngineService;
- // The interface between the Apex engine and Apex policy deployment for the Apex engine
- private final EngDepMessagingService engDepService;
-
/**
* Instantiates a new engine holder with its engine service and EngDep service.
*
* @param apexEngineService the apex engine service
- * @param engDepService the EngDep service
+ * @param engDepService the EngDep service
*/
- ApexEngineServiceHandler(final EngineService apexEngineService, final EngDepMessagingService engDepService) {
+ ApexEngineServiceHandler(final EngineService apexEngineService) {
this.apexEngineService = apexEngineService;
- this.engDepService = engDepService;
}
/**
* This method forwards an event to the Apex service.
- *
+ *
* @param apexEvent The event to forward to Apex
*/
public void forwardEvent(final ApexEvent apexEvent) {
@@ -75,11 +71,6 @@ public class ApexEngineServiceHandler {
* @throws ApexException on termination errors
*/
public void terminate() throws ApexException {
- // Shut down engine management
- if (engDepService != null) {
- engDepService.stop();
- }
-
// Shut down each engine instance
if (apexEngineService != null) {
apexEngineService.stop();