aboutsummaryrefslogtreecommitdiffstats
path: root/core/core-deployment/src
diff options
context:
space:
mode:
authorramverma <ram.krishna.verma@ericsson.com>2018-06-01 11:51:36 +0100
committerramverma <ram.krishna.verma@ericsson.com>2018-06-04 10:50:44 +0100
commit37d6fd9069eb30d88c4ad80b5f35099ed173cc13 (patch)
tree0f30a71577644047feee43bd8857dc5a82a51c87 /core/core-deployment/src
parent5722440b2eb8ff1923dda9d4d856f0adc1ac8e6f (diff)
Adding apex core module to apex-pdp
Change-Id: I4bfe1df3e44fe62ff6789e813e59836e267ab3b2 Issue-ID: POLICY-858 Signed-off-by: ramverma <ram.krishna.verma@ericsson.com>
Diffstat (limited to 'core/core-deployment/src')
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java51
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java154
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java202
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java480
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java123
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java28
6 files changed, 1038 insertions, 0 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java
new file mode 100644
index 000000000..e932bbd45
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.deployment;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+
+/**
+ * The Class ApexDeploymentException is an exception that may be thrown on deployment errors in Apex.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexDeploymentException extends ApexException {
+ private static final long serialVersionUID = 1816909564890470707L;
+
+ /**
+ * Instantiates a new apex deployment exception.
+ *
+ * @param message the message
+ */
+ public ApexDeploymentException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiates a new apex deployment exception.
+ *
+ * @param message the message
+ * @param e the e
+ */
+ public ApexDeploymentException(final String message, final Exception e) {
+ super(message, e);
+ }
+}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java
new file mode 100644
index 000000000..499644fd9
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java
@@ -0,0 +1,154 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.deployment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class {@link BatchDeployer} deploys an Apex model held as an XML or Json file onto an Apex engine. It uses the
+ * EngDep protocol to communicate with the engine, with the EngDep protocol being carried on Java web sockets.
+ *
+ * This deployer is a simple command line deployer that reads the communication parameters and the location of the Apex
+ * model file as arguments.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class BatchDeployer {
+ private static final int NUM_ARGUMENTS = 3;
+
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class);
+
+ // The facade that is handling messaging to the engine service
+ private EngineServiceFacade engineServiceFacade = null;
+
+ /**
+ * The main method, reads the Apex server host address, port and location of the Apex model file from the command
+ * line arguments.
+ *
+ * @param args the arguments that specify the Apex engine and the Apex model file
+ */
+ public static void main(final String[] args) {
+ if (args.length != NUM_ARGUMENTS) {
+ LOGGER.error("invalid arguments: " + Arrays.toString(args));
+ LOGGER.error("usage: Deployer <server address> <port address> <Apex Model file location>");
+ return;
+ }
+
+ BatchDeployer deployer = null;
+ try {
+ // Use a Deployer object to handle model deployment
+ deployer = new BatchDeployer(args[0], Integer.parseInt(args[1]));
+ deployer.init();
+ deployer.deployModel(args[2], false, false);
+ deployer.startEngines();
+ } catch (final ApexException | IOException e) {
+ LOGGER.error("model deployment failed on parameters {}", args, e);
+ } finally {
+ if (deployer != null) {
+ deployer.close();
+ }
+ }
+ }
+
+ /**
+ * Instantiates a new deployer.
+ *
+ * @param hostName the host name of the host running the Apex Engine
+ * @param port the port to use for EngDep communication with the Apex engine
+ */
+ public BatchDeployer(final String hostName, final int port) {
+ engineServiceFacade = new EngineServiceFacade(hostName, port);
+ }
+
+ /**
+ * Initializes the deployer, opens an EngDep communication session with the Apex engine.
+ *
+ * @throws ApexDeploymentException thrown on deployment and communication errors
+ */
+ public void init() throws ApexDeploymentException {
+ engineServiceFacade.init();
+ }
+
+ /**
+ * Close the EngDep connection to the Apex server.
+ */
+ public void close() {
+ engineServiceFacade.close();
+ }
+
+ /**
+ * Deploy an Apex model on the Apex server.
+ *
+ * @param modelFileName the name of the model file containing the model to deploy
+ * @param ignoreConflicts true if conflicts between context in polices is to be ignored
+ * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @throws ApexException on Apex errors
+ * @throws IOException on IO exceptions from the operating system
+ */
+ public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
+ throws ApexException, IOException {
+ engineServiceFacade.deployModel(modelFileName, ignoreConflicts, force);
+ }
+
+ /**
+ * Deploy an Apex model on the Apex server.
+ *
+ * @param policyModel the model to deploy
+ * @param ignoreConflicts true if conflicts between context in polices is to be ignored
+ * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @throws ApexException on Apex errors
+ * @throws IOException on IO exceptions from the operating system
+ */
+ public void deployModel(final AxPolicyModel policyModel, final boolean ignoreConflicts, final boolean force)
+ throws ApexException, IOException {
+ engineServiceFacade.deployModel(policyModel, ignoreConflicts, force);
+ }
+
+ /**
+ * Start the Apex engines on the engine service.
+ *
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void startEngines() throws ApexDeploymentException {
+ for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
+ engineServiceFacade.startEngine(engineKey);
+ }
+ }
+
+ /**
+ * Stop the Apex engines on the engine service.
+ *
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void stopEngines() throws ApexDeploymentException {
+ for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
+ engineServiceFacade.stopEngine(engineKey);
+ }
+ }
+}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
new file mode 100644
index 000000000..c2a19a167
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
@@ -0,0 +1,202 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.deployment;
+
+import com.google.common.eventbus.Subscribe;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.core.protocols.Message;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
+ * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
+ * client thread and a receiving queue to queue messages received from the Apex engine.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class DeploymentClient implements Runnable {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
+
+ private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
+
+ // Host and port to use for EngDep messaging
+ private String host = null;
+ private int port = 0;
+
+ // Messaging service is used to transmit and receive messages over the web socket
+ private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
+ private MessagingService<Message> service = null;
+
+ // Send and receive queues for message buffering
+ private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
+
+ // Thread management fields
+ private boolean started = false;
+ private Thread thisThread = null;
+
+ /**
+ * Instantiates a new deployment client.
+ *
+ * @param host the host name that the EngDep server is running on
+ * @param port the port the port the EngDep server is using
+ */
+ public DeploymentClient(final String host, final int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ LOGGER.debug("engine<-->deployment to \"ws://" + host + ":" + port + "\" thread starting . . .");
+
+ // Set up the thread name
+ thisThread = Thread.currentThread();
+ thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
+
+ try {
+ // Establish a connection to the Apex server for EngDep message communication over Web Sockets
+ service = factory.createClient(new URI("ws://" + host + ":" + port));
+ service.addMessageListener(new DeploymentClientListener());
+
+ service.startConnection();
+ started = true;
+ LOGGER.debug("engine<-->deployment client thread started");
+ } catch (final Exception e) {
+ LOGGER.error("engine<-->deployment client thread exception", e);
+ return;
+ }
+
+ // Loop forever, sending messages as they appear on the queue
+ while (true) {
+ try {
+ final Message messageForSending = sendQueue.take();
+ sendMessage(messageForSending);
+ } catch (final InterruptedException e) {
+ // Message sending has been interrupted, we are finished
+ LOGGER.debug("engine<-->deployment client interrupted");
+ break;
+ }
+ }
+
+ // Thread has been interrupted
+ thisThread = null;
+ LOGGER.debug("engine<-->deployment client thread finished");
+ }
+
+ /**
+ * Send an EngDep message to the Apex server.
+ *
+ * @param message the message to send to the Apex server
+ */
+ public void sendMessage(final Message message) {
+ final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
+
+ // Send the message in its message holder
+ messageHolder.addMessage(message);
+ service.send(messageHolder);
+ }
+
+ /**
+ * Stop the deployment client.
+ */
+ public void stopClient() {
+ LOGGER.debug("engine<-->deployment test client stopping . . .");
+ thisThread.interrupt();
+
+ // Wait for the thread to stop
+ while (thisThread != null && thisThread.isAlive()) {
+ ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
+ }
+
+ // Close the Web Services connection
+ service.stopConnection();
+ started = false;
+ LOGGER.debug("engine<-->deployment test client stopped . . .");
+ }
+
+ /**
+ * Checks if the client thread is started.
+ *
+ * @return true, if the client thread is started
+ */
+ public boolean isStarted() {
+ return started;
+ }
+
+ /**
+ * Allows users of this class to get a reference to the receive queue to receove messages.
+ *
+ * @return the receive queue
+ */
+ public BlockingQueue<Message> getReceiveQueue() {
+ return receiveQueue;
+ }
+
+ /**
+ * The listener interface for receiving deploymentClient events. The class that is interested in processing a
+ * deploymentClient event implements this interface, and the object created with that class is registered with a
+ * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
+ * occurs, that object's appropriate method is invoked.
+ *
+ * @see DeploymentClientEvent
+ */
+ private class DeploymentClientListener implements MessageListener<Message> {
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
+ * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ */
+ @Subscribe
+ @Override
+ public void onMessage(final MessageBlock<Message> messageData) {
+ receiveQueue.addAll(messageData.getMessages());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String)
+ */
+ @Override
+ public void onMessage(final String messageString) {
+ throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");
+ }
+ }
+}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java
new file mode 100644
index 000000000..d954feaa3
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java
@@ -0,0 +1,480 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.deployment;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.core.protocols.Message;
+import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
+import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
+import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
+import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
+import org.onap.policy.apex.core.protocols.engdep.messages.Response;
+import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
+import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
+import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
+import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
+import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
+import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
+import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.utilities.ResourceUtils;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
+ * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
+ *
+ * This deployer is a simple command line deployer that reads the communication parameters and the location of the XML
+ * model file as arguments.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EngineServiceFacade {
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
+
+ // The default message timeout and timeout increment (the amount of time between polls) in milliseconds
+ private static final int CLIENT_START_WAIT_INTERVAL = 100;
+ private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
+ private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
+
+ // The Apex engine host and EngDep port
+ private final String hostName;
+ private final int port;
+
+ // The deployment client handles the EngDep communication session towards the Apex server
+ private DeploymentClient client = null;
+ private Thread clientThread = null;
+
+ // Information about the Engine service we are connected to
+ private AxArtifactKey engineServiceKey = null;
+ private AxArtifactKey[] engineKeyArray = null;
+ private AxArtifactKey apexModelKey = null;
+
+ /**
+ * Instantiates a new deployer.
+ *
+ * @param hostName the host name of the host running the Apex Engine
+ * @param port the port to use for EngDep communication with the Apex engine
+ */
+ public EngineServiceFacade(final String hostName, final int port) {
+ this.hostName = hostName;
+ this.port = port;
+ }
+
+ /**
+ * Initializes the facade, opens an EngDep communication session with the Apex engine.
+ *
+ * @throws ApexDeploymentException thrown on deployment and communication errors
+ */
+ public void init() throws ApexDeploymentException {
+ try {
+ LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
+
+ // Use the deployment client to handle the EngDep communication towards the Apex server. It runs a thread to
+ // monitor the session and to send
+ // messages
+ client = new DeploymentClient(hostName, port);
+ clientThread = new Thread(client);
+ clientThread.start();
+
+ // Wait for the connection to come up
+ while (!client.isStarted()) {
+ if (clientThread.isAlive()) {
+ ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
+ } else {
+ LOGGER.error("cound not handshake with server {}:{}", hostName, port);
+ throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port);
+ }
+ }
+
+ LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
+
+ // Get engine service information to see what engines we're dealing with
+ final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null);
+ LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo,
+ hostName, port);
+ client.sendMessage(engineServiceInfo);
+ LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port);
+
+ final EngineServiceInfoResponse engineServiceInfoResponse =
+ (EngineServiceInfoResponse) getResponse(engineServiceInfo);
+ if (engineServiceInfoResponse.isSuccessful()) {
+ engineServiceKey = engineServiceInfoResponse.getEngineServiceKey();
+ engineKeyArray = engineServiceInfoResponse.getEngineKeyArray();
+ apexModelKey = engineServiceInfoResponse.getApexModelKey();
+ }
+ } catch (final Exception e) {
+ LOGGER.error("cound not handshake with server {}:{}", hostName, port, e);
+ client.stopClient();
+ throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e);
+ }
+
+ }
+
+ /**
+ * Get the engine service key.
+ *
+ * @return the engine service key
+ */
+ public AxArtifactKey getApexModelKey() {
+ return apexModelKey;
+ }
+
+ /**
+ * Get the keys of the engines on this engine service.
+ *
+ * @return the engine key array
+ */
+ public AxArtifactKey[] getEngineKeyArray() {
+ return engineKeyArray;
+ }
+
+ /**
+ * Get the engine service key.
+ *
+ * @return the engine service key
+ */
+ public AxArtifactKey getKey() {
+ return engineServiceKey;
+ }
+
+ /**
+ * Close the EngDep connection to the Apex server.
+ */
+ public void close() {
+ LOGGER.debug("closing connection to server {}:{} . . .", hostName, port);
+
+ client.stopClient();
+
+ LOGGER.debug("closed connection to server {}:{} . . .", hostName, port);
+ }
+
+ /**
+ * Deploy an Apex model on the Apex engine service.
+ *
+ * @param modelFileName the name of the model file containing the model to deploy
+ * @param ignoreConflicts true if conflicts between context in polices is to be ignored
+ * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @throws ApexException on Apex errors
+ * @throws IOException on IO exceptions from the operating system
+ */
+ public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
+ throws ApexException, IOException {
+ if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) {
+ LOGGER.error("cound not deploy apex model, deployer is not initialized");
+ throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized");
+ }
+
+ // Get the model file as a string
+ URL apexModelURL = ResourceUtils.getLocalFile(modelFileName);
+ if (apexModelURL == null) {
+ apexModelURL = ResourceUtils.getURLResource(modelFileName);
+ if (apexModelURL == null) {
+ LOGGER.error("cound not create apex model, could not read from XML file {}", modelFileName);
+ throw new ApexDeploymentException(
+ "cound not create apex model, could not read XML file " + modelFileName);
+ }
+ }
+
+ deployModel(modelFileName, apexModelURL.openStream(), ignoreConflicts, force);
+ }
+
+ /**
+ * Deploy an Apex model on the Apex engine service.
+ *
+ * @param modelFileName the name of the model file containing the model to deploy
+ * @param modelInputStream the stream that holds the Apex model
+ * @param ignoreConflicts true if conflicts between context in polices is to be ignored
+ * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @throws ApexException on model deployment errors
+ */
+ public void deployModel(final String modelFileName, final InputStream modelInputStream,
+ final boolean ignoreConflicts, final boolean force) throws ApexException {
+ // Read the policy model from the stream
+ final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
+ modelReader.setValidateFlag(!ignoreConflicts);
+ final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream);
+ if (apexPolicyModel == null) {
+ LOGGER.error("cound not create apex model, could not read model stream");
+ throw new ApexDeploymentException("cound not create apex model, could not read model stream");
+ }
+
+ // Deploy the model
+ deployModel(apexPolicyModel, ignoreConflicts, force);
+ }
+
+ /**
+ * Deploy an Apex model on the Apex engine service.
+ *
+ * @param apexPolicyModel the name of the model to deploy
+ * @param ignoreConflicts true if conflicts between context in polices is to be ignored
+ * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @throws ApexException on model deployment errors
+ */
+ public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
+ throws ApexException {
+ // Write the model into a byte array
+ final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
+ final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class);
+ modelWriter.write(apexPolicyModel, baOutputStream);
+
+ // Create and send Update message
+ final UpdateModel umMessage =
+ new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force);
+
+ LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port);
+ client.sendMessage(umMessage);
+ LOGGER.debug("sent update message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(umMessage);
+ if (response.isSuccessful()) {
+ LOGGER.debug(response.toString());
+ } else {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+ }
+
+ /**
+ * Start an Apex engine on the engine service.
+ *
+ * @param engineKey the key of the engine to start
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
+ final StartEngine startEngineMessage = new StartEngine(engineKey);
+ LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port);
+ client.sendMessage(startEngineMessage);
+ LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(startEngineMessage);
+ if (response.isSuccessful()) {
+ LOGGER.debug(response.toString());
+ } else {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexDeploymentException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+ }
+
+ /**
+ * Stop an Apex engine on the engine service.
+ *
+ * @param engineKey the key of the engine to stop
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
+ final StopEngine stopEngineMessage = new StopEngine(engineKey);
+ LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port);
+ client.sendMessage(stopEngineMessage);
+ LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(stopEngineMessage);
+ if (response.isSuccessful()) {
+ LOGGER.debug(response.toString());
+ } else {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexDeploymentException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+ }
+
+ /**
+ * Start periodic events on an Apex engine on the engine service.
+ *
+ * @param engineKey the key of the engine to start periodic events on
+ * @param period the period in milliseconds between periodic events
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException {
+ final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey);
+ startPerioidicEventsMessage.setMessageData(Long.toString(period));
+ LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName,
+ port);
+ client.sendMessage(startPerioidicEventsMessage);
+ LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(startPerioidicEventsMessage);
+ if (response.isSuccessful()) {
+ LOGGER.debug(response.toString());
+ } else {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexDeploymentException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+ }
+
+ /**
+ * Stop periodic events on an Apex engine on the engine service.
+ *
+ * @param engineKey the key of the engine to stop periodic events on
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException {
+ final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey);
+ LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName,
+ port);
+ client.sendMessage(stopPerioidicEventsMessage);
+ LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(stopPerioidicEventsMessage);
+ if (response.isSuccessful()) {
+ LOGGER.debug(response.toString());
+ } else {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexDeploymentException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+ }
+
+ /**
+ * Get the status of an Apex engine.
+ *
+ * @param engineKey the key of the engine to get the status of
+ * @return an engine model containing the status of the engine for the given key
+ * @throws ApexException the apex exception
+ */
+ public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException {
+ final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey);
+ LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port);
+ client.sendMessage(engineStatusMessage);
+ LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(engineStatusMessage);
+ if (!response.isSuccessful()) {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+
+ final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes());
+ final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class);
+ modelReader.setValidateFlag(false);
+ return modelReader.read(baInputStream);
+ }
+
+ /**
+ * Get the runtime information of an Apex engine.
+ *
+ * @param engineKey the key of the engine to get information for
+ * @return an engine model containing information on the engine for the given key
+ * @throws ApexException the apex exception
+ */
+ public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException {
+ final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey);
+ LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName,
+ port);
+ client.sendMessage(engineInfoMessage);
+ LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port);
+
+ // Check if we got a response
+ final Response response = getResponse(engineInfoMessage);
+ if (!response.isSuccessful()) {
+ LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port);
+ throw new ApexException(
+ "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port);
+ }
+
+ return response.getMessageData();
+ }
+
+ /**
+ * Check the response to a model deployment message from the Apex server.
+ *
+ * @param sentMessage the sent message
+ * @return the response message
+ * @throws ApexDeploymentException the apex deployment exception
+ */
+ private Response getResponse(final Message sentMessage) throws ApexDeploymentException {
+ // Get the amount of milliseconds we should wait for a timeout
+ int timeoutTime = sentMessage.getReplyTimeout();
+ if (timeoutTime <= 0) {
+ timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT;
+ }
+
+ // Wait for the required amount of milliseconds for the response from the Apex server
+ Message receivedMessage = null;
+ for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar +=
+ REPLY_MESSAGE_TIMEOUT_INCREMENT) {
+ try {
+ receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
+ throw new ApexDeploymentException(
+ "reception of response from server interrupted " + hostName + ':' + port, e);
+ }
+ }
+
+ // Check if response to sent message
+ if (receivedMessage == null) {
+ LOGGER.warn("no response received to sent message " + sentMessage.getAction());
+ throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction());
+ }
+
+ // Check instance is a response message
+ if (!(receivedMessage instanceof Response)) {
+ LOGGER.warn("response received from server is of incorrect type {}, should be of type {}",
+ receivedMessage.getClass().getName(), Response.class.getName());
+ throw new ApexDeploymentException("response received from server is of incorrect type "
+ + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName());
+ }
+
+ // Cast the response message
+ final Response responseMessage = (Response) receivedMessage;
+
+ // Check if response to sent message
+ if (!responseMessage.getResponseTo().equals(sentMessage)) {
+ LOGGER.warn("response received is not response to sent message " + sentMessage.getAction());
+ throw new ApexDeploymentException(
+ "response received is not correct response to sent message " + sentMessage.getAction());
+ }
+
+ // Check if successful
+ if (responseMessage.isSuccessful()) {
+ LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(),
+ responseMessage.getMessageData());
+ } else {
+ LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(),
+ responseMessage.getMessageData());
+ }
+
+ return responseMessage;
+ }
+}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java
new file mode 100644
index 000000000..bfaece4c6
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.deployment;
+
+import java.util.Arrays;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This utility class is used to start and stop periodic events on Apex engines over the EngDep protocol.
+ */
+public class PeriodicEventManager {
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class);
+
+ private static final int NUM_ARGUMENTS = 4;
+ private static final int PERIODIC_EVENT_INTERVAL = 3;
+
+ // The facade that is handling messaging to the engine service
+ private EngineServiceFacade engineServiceFacade = null;
+
+ /**
+ * The main method, reads the Apex server host address, port and location of the Apex model XML file from the
+ * command line arguments.
+ *
+ * @param args the arguments that specify the Apex engine and the Apex model file
+ */
+ public static void main(final String[] args) {
+ if (args.length != NUM_ARGUMENTS) {
+ LOGGER.error("invalid arguments: " + Arrays.toString(args));
+ LOGGER.error("usage: Deployer <server address> <port address> <start/stop> <periods in ms>");
+ return;
+ }
+
+ PeriodicEventManager deployer = null;
+ try {
+ // Use a Deployer object to handle model deployment
+ deployer = new PeriodicEventManager(args[0], Integer.parseInt(args[1]));
+ deployer.init();
+ if (args[2].equalsIgnoreCase("start")) {
+ deployer.startPerioidicEvents(Long.parseLong(args[PERIODIC_EVENT_INTERVAL]));
+ } else {
+ deployer.stopPerioidicEvents();
+ }
+ } catch (final ApexException e) {
+ LOGGER.error("model deployment failed on parameters {}", args, e);
+ } finally {
+ if (deployer != null) {
+ deployer.close();
+ }
+ }
+ }
+
+ /**
+ * Instantiates a new deployer.
+ *
+ * @param hostName the host name of the host running the Apex Engine
+ * @param port the port to use for EngDep communication with the Apex engine
+ */
+ public PeriodicEventManager(final String hostName, final int port) {
+ engineServiceFacade = new EngineServiceFacade(hostName, port);
+ }
+
+ /**
+ * Initializes the deployer, opens an EngDep communication session with the Apex engine.
+ *
+ * @throws ApexDeploymentException thrown on deployment and communication errors
+ */
+ public void init() throws ApexDeploymentException {
+ engineServiceFacade.init();
+ }
+
+ /**
+ * Close the EngDep connection to the Apex server.
+ */
+ public void close() {
+ engineServiceFacade.close();
+ }
+
+ /**
+ * Start the Apex engines on the engine service.
+ *
+ * @param period the interval in milliseconds between periodic events
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void startPerioidicEvents(final long period) throws ApexDeploymentException {
+ for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
+ engineServiceFacade.startPerioidicEvents(engineKey, period);
+ }
+ }
+
+ /**
+ * Stop the Apex engines on the engine service.
+ *
+ * @throws ApexDeploymentException on messaging errors
+ */
+ public void stopPerioidicEvents() throws ApexDeploymentException {
+ for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
+ engineServiceFacade.stopPerioidicEvents(engineKey);
+ }
+ }
+}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java
new file mode 100644
index 000000000..b2b7fda2d
--- /dev/null
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides a facade and client that allows Apex engines to be managed and monitored over the EngDep protocol. Some
+ * utility classes for deployment are also provided.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.deployment;