diff options
author | ramverma <ram.krishna.verma@ericsson.com> | 2018-06-01 11:51:36 +0100 |
---|---|---|
committer | ramverma <ram.krishna.verma@ericsson.com> | 2018-06-04 10:50:44 +0100 |
commit | 37d6fd9069eb30d88c4ad80b5f35099ed173cc13 (patch) | |
tree | 0f30a71577644047feee43bd8857dc5a82a51c87 /core/core-deployment/src | |
parent | 5722440b2eb8ff1923dda9d4d856f0adc1ac8e6f (diff) |
Adding apex core module to apex-pdp
Change-Id: I4bfe1df3e44fe62ff6789e813e59836e267ab3b2
Issue-ID: POLICY-858
Signed-off-by: ramverma <ram.krishna.verma@ericsson.com>
Diffstat (limited to 'core/core-deployment/src')
6 files changed, 1038 insertions, 0 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java new file mode 100644 index 000000000..e932bbd45 --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/ApexDeploymentException.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; + +/** + * The Class ApexDeploymentException is an exception that may be thrown on deployment errors in Apex. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexDeploymentException extends ApexException { + private static final long serialVersionUID = 1816909564890470707L; + + /** + * Instantiates a new apex deployment exception. + * + * @param message the message + */ + public ApexDeploymentException(final String message) { + super(message); + } + + /** + * Instantiates a new apex deployment exception. + * + * @param message the message + * @param e the e + */ + public ApexDeploymentException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java new file mode 100644 index 000000000..499644fd9 --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/BatchDeployer.java @@ -0,0 +1,154 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import java.io.IOException; +import java.util.Arrays; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class {@link BatchDeployer} deploys an Apex model held as an XML or Json file onto an Apex engine. It uses the + * EngDep protocol to communicate with the engine, with the EngDep protocol being carried on Java web sockets. + * + * This deployer is a simple command line deployer that reads the communication parameters and the location of the Apex + * model file as arguments. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class BatchDeployer { + private static final int NUM_ARGUMENTS = 3; + + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class); + + // The facade that is handling messaging to the engine service + private EngineServiceFacade engineServiceFacade = null; + + /** + * The main method, reads the Apex server host address, port and location of the Apex model file from the command + * line arguments. + * + * @param args the arguments that specify the Apex engine and the Apex model file + */ + public static void main(final String[] args) { + if (args.length != NUM_ARGUMENTS) { + LOGGER.error("invalid arguments: " + Arrays.toString(args)); + LOGGER.error("usage: Deployer <server address> <port address> <Apex Model file location>"); + return; + } + + BatchDeployer deployer = null; + try { + // Use a Deployer object to handle model deployment + deployer = new BatchDeployer(args[0], Integer.parseInt(args[1])); + deployer.init(); + deployer.deployModel(args[2], false, false); + deployer.startEngines(); + } catch (final ApexException | IOException e) { + LOGGER.error("model deployment failed on parameters {}", args, e); + } finally { + if (deployer != null) { + deployer.close(); + } + } + } + + /** + * Instantiates a new deployer. + * + * @param hostName the host name of the host running the Apex Engine + * @param port the port to use for EngDep communication with the Apex engine + */ + public BatchDeployer(final String hostName, final int port) { + engineServiceFacade = new EngineServiceFacade(hostName, port); + } + + /** + * Initializes the deployer, opens an EngDep communication session with the Apex engine. + * + * @throws ApexDeploymentException thrown on deployment and communication errors + */ + public void init() throws ApexDeploymentException { + engineServiceFacade.init(); + } + + /** + * Close the EngDep connection to the Apex server. + */ + public void close() { + engineServiceFacade.close(); + } + + /** + * Deploy an Apex model on the Apex server. + * + * @param modelFileName the name of the model file containing the model to deploy + * @param ignoreConflicts true if conflicts between context in polices is to be ignored + * @param force true if the model is to be applied even if it is incompatible with the existing model + * @throws ApexException on Apex errors + * @throws IOException on IO exceptions from the operating system + */ + public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force) + throws ApexException, IOException { + engineServiceFacade.deployModel(modelFileName, ignoreConflicts, force); + } + + /** + * Deploy an Apex model on the Apex server. + * + * @param policyModel the model to deploy + * @param ignoreConflicts true if conflicts between context in polices is to be ignored + * @param force true if the model is to be applied even if it is incompatible with the existing model + * @throws ApexException on Apex errors + * @throws IOException on IO exceptions from the operating system + */ + public void deployModel(final AxPolicyModel policyModel, final boolean ignoreConflicts, final boolean force) + throws ApexException, IOException { + engineServiceFacade.deployModel(policyModel, ignoreConflicts, force); + } + + /** + * Start the Apex engines on the engine service. + * + * @throws ApexDeploymentException on messaging errors + */ + public void startEngines() throws ApexDeploymentException { + for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { + engineServiceFacade.startEngine(engineKey); + } + } + + /** + * Stop the Apex engines on the engine service. + * + * @throws ApexDeploymentException on messaging errors + */ + public void stopEngines() throws ApexDeploymentException { + for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { + engineServiceFacade.stopEngine(engineKey); + } + } +} diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java new file mode 100644 index 000000000..c2a19a167 --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java @@ -0,0 +1,202 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import com.google.common.eventbus.Subscribe; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.core.protocols.Message; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a + * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the + * client thread and a receiving queue to queue messages received from the Apex engine. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class DeploymentClient implements Runnable { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class); + + private static final int CLIENT_STOP_WAIT_INTERVAL = 100; + + // Host and port to use for EngDep messaging + private String host = null; + private int port = 0; + + // Messaging service is used to transmit and receive messages over the web socket + private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>(); + private MessagingService<Message> service = null; + + // Send and receive queues for message buffering + private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>(); + + // Thread management fields + private boolean started = false; + private Thread thisThread = null; + + /** + * Instantiates a new deployment client. + * + * @param host the host name that the EngDep server is running on + * @param port the port the port the EngDep server is using + */ + public DeploymentClient(final String host, final int port) { + this.host = host; + this.port = port; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + LOGGER.debug("engine<-->deployment to \"ws://" + host + ":" + port + "\" thread starting . . ."); + + // Set up the thread name + thisThread = Thread.currentThread(); + thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port); + + try { + // Establish a connection to the Apex server for EngDep message communication over Web Sockets + service = factory.createClient(new URI("ws://" + host + ":" + port)); + service.addMessageListener(new DeploymentClientListener()); + + service.startConnection(); + started = true; + LOGGER.debug("engine<-->deployment client thread started"); + } catch (final Exception e) { + LOGGER.error("engine<-->deployment client thread exception", e); + return; + } + + // Loop forever, sending messages as they appear on the queue + while (true) { + try { + final Message messageForSending = sendQueue.take(); + sendMessage(messageForSending); + } catch (final InterruptedException e) { + // Message sending has been interrupted, we are finished + LOGGER.debug("engine<-->deployment client interrupted"); + break; + } + } + + // Thread has been interrupted + thisThread = null; + LOGGER.debug("engine<-->deployment client thread finished"); + } + + /** + * Send an EngDep message to the Apex server. + * + * @param message the message to send to the Apex server + */ + public void sendMessage(final Message message) { + final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost()); + + // Send the message in its message holder + messageHolder.addMessage(message); + service.send(messageHolder); + } + + /** + * Stop the deployment client. + */ + public void stopClient() { + LOGGER.debug("engine<-->deployment test client stopping . . ."); + thisThread.interrupt(); + + // Wait for the thread to stop + while (thisThread != null && thisThread.isAlive()) { + ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL); + } + + // Close the Web Services connection + service.stopConnection(); + started = false; + LOGGER.debug("engine<-->deployment test client stopped . . ."); + } + + /** + * Checks if the client thread is started. + * + * @return true, if the client thread is started + */ + public boolean isStarted() { + return started; + } + + /** + * Allows users of this class to get a reference to the receive queue to receove messages. + * + * @return the receive queue + */ + public BlockingQueue<Message> getReceiveQueue() { + return receiveQueue; + } + + /** + * The listener interface for receiving deploymentClient events. The class that is interested in processing a + * deploymentClient event implements this interface, and the object created with that class is registered with a + * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event + * occurs, that object's appropriate method is invoked. + * + * @see DeploymentClientEvent + */ + private class DeploymentClientListener implements MessageListener<Message> { + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core. + * infrastructure.messaging.impl.ws.messageblock. MessageBlock) + */ + @Subscribe + @Override + public void onMessage(final MessageBlock<Message> messageData) { + receiveQueue.addAll(messageData.getMessages()); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String) + */ + @Override + public void onMessage(final String messageString) { + throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol"); + } + } +} diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java new file mode 100644 index 000000000..d954feaa3 --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java @@ -0,0 +1,480 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.core.protocols.Message; +import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse; +import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo; +import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo; +import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus; +import org.onap.policy.apex.core.protocols.engdep.messages.Response; +import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine; +import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents; +import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine; +import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents; +import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel; +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader; +import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter; +import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.utilities.ResourceUtils; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to + * communicate with the engine, with the EngDep protocol being carried on Java web sockets. + * + * This deployer is a simple command line deployer that reads the communication parameters and the location of the XML + * model file as arguments. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EngineServiceFacade { + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class); + + // The default message timeout and timeout increment (the amount of time between polls) in milliseconds + private static final int CLIENT_START_WAIT_INTERVAL = 100; + private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000; + private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100; + + // The Apex engine host and EngDep port + private final String hostName; + private final int port; + + // The deployment client handles the EngDep communication session towards the Apex server + private DeploymentClient client = null; + private Thread clientThread = null; + + // Information about the Engine service we are connected to + private AxArtifactKey engineServiceKey = null; + private AxArtifactKey[] engineKeyArray = null; + private AxArtifactKey apexModelKey = null; + + /** + * Instantiates a new deployer. + * + * @param hostName the host name of the host running the Apex Engine + * @param port the port to use for EngDep communication with the Apex engine + */ + public EngineServiceFacade(final String hostName, final int port) { + this.hostName = hostName; + this.port = port; + } + + /** + * Initializes the facade, opens an EngDep communication session with the Apex engine. + * + * @throws ApexDeploymentException thrown on deployment and communication errors + */ + public void init() throws ApexDeploymentException { + try { + LOGGER.debug("handshaking with server {}:{} . . .", hostName, port); + + // Use the deployment client to handle the EngDep communication towards the Apex server. It runs a thread to + // monitor the session and to send + // messages + client = new DeploymentClient(hostName, port); + clientThread = new Thread(client); + clientThread.start(); + + // Wait for the connection to come up + while (!client.isStarted()) { + if (clientThread.isAlive()) { + ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL); + } else { + LOGGER.error("cound not handshake with server {}:{}", hostName, port); + throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port); + } + } + + LOGGER.debug("opened connection to server {}:{} . . .", hostName, port); + + // Get engine service information to see what engines we're dealing with + final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null); + LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo, + hostName, port); + client.sendMessage(engineServiceInfo); + LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port); + + final EngineServiceInfoResponse engineServiceInfoResponse = + (EngineServiceInfoResponse) getResponse(engineServiceInfo); + if (engineServiceInfoResponse.isSuccessful()) { + engineServiceKey = engineServiceInfoResponse.getEngineServiceKey(); + engineKeyArray = engineServiceInfoResponse.getEngineKeyArray(); + apexModelKey = engineServiceInfoResponse.getApexModelKey(); + } + } catch (final Exception e) { + LOGGER.error("cound not handshake with server {}:{}", hostName, port, e); + client.stopClient(); + throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e); + } + + } + + /** + * Get the engine service key. + * + * @return the engine service key + */ + public AxArtifactKey getApexModelKey() { + return apexModelKey; + } + + /** + * Get the keys of the engines on this engine service. + * + * @return the engine key array + */ + public AxArtifactKey[] getEngineKeyArray() { + return engineKeyArray; + } + + /** + * Get the engine service key. + * + * @return the engine service key + */ + public AxArtifactKey getKey() { + return engineServiceKey; + } + + /** + * Close the EngDep connection to the Apex server. + */ + public void close() { + LOGGER.debug("closing connection to server {}:{} . . .", hostName, port); + + client.stopClient(); + + LOGGER.debug("closed connection to server {}:{} . . .", hostName, port); + } + + /** + * Deploy an Apex model on the Apex engine service. + * + * @param modelFileName the name of the model file containing the model to deploy + * @param ignoreConflicts true if conflicts between context in polices is to be ignored + * @param force true if the model is to be applied even if it is incompatible with the existing model + * @throws ApexException on Apex errors + * @throws IOException on IO exceptions from the operating system + */ + public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force) + throws ApexException, IOException { + if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) { + LOGGER.error("cound not deploy apex model, deployer is not initialized"); + throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized"); + } + + // Get the model file as a string + URL apexModelURL = ResourceUtils.getLocalFile(modelFileName); + if (apexModelURL == null) { + apexModelURL = ResourceUtils.getURLResource(modelFileName); + if (apexModelURL == null) { + LOGGER.error("cound not create apex model, could not read from XML file {}", modelFileName); + throw new ApexDeploymentException( + "cound not create apex model, could not read XML file " + modelFileName); + } + } + + deployModel(modelFileName, apexModelURL.openStream(), ignoreConflicts, force); + } + + /** + * Deploy an Apex model on the Apex engine service. + * + * @param modelFileName the name of the model file containing the model to deploy + * @param modelInputStream the stream that holds the Apex model + * @param ignoreConflicts true if conflicts between context in polices is to be ignored + * @param force true if the model is to be applied even if it is incompatible with the existing model + * @throws ApexException on model deployment errors + */ + public void deployModel(final String modelFileName, final InputStream modelInputStream, + final boolean ignoreConflicts, final boolean force) throws ApexException { + // Read the policy model from the stream + final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class); + modelReader.setValidateFlag(!ignoreConflicts); + final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream); + if (apexPolicyModel == null) { + LOGGER.error("cound not create apex model, could not read model stream"); + throw new ApexDeploymentException("cound not create apex model, could not read model stream"); + } + + // Deploy the model + deployModel(apexPolicyModel, ignoreConflicts, force); + } + + /** + * Deploy an Apex model on the Apex engine service. + * + * @param apexPolicyModel the name of the model to deploy + * @param ignoreConflicts true if conflicts between context in polices is to be ignored + * @param force true if the model is to be applied even if it is incompatible with the existing model + * @throws ApexException on model deployment errors + */ + public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force) + throws ApexException { + // Write the model into a byte array + final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); + final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class); + modelWriter.write(apexPolicyModel, baOutputStream); + + // Create and send Update message + final UpdateModel umMessage = + new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force); + + LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port); + client.sendMessage(umMessage); + LOGGER.debug("sent update message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(umMessage); + if (response.isSuccessful()) { + LOGGER.debug(response.toString()); + } else { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + } + + /** + * Start an Apex engine on the engine service. + * + * @param engineKey the key of the engine to start + * @throws ApexDeploymentException on messaging errors + */ + public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException { + final StartEngine startEngineMessage = new StartEngine(engineKey); + LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port); + client.sendMessage(startEngineMessage); + LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(startEngineMessage); + if (response.isSuccessful()) { + LOGGER.debug(response.toString()); + } else { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexDeploymentException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + } + + /** + * Stop an Apex engine on the engine service. + * + * @param engineKey the key of the engine to stop + * @throws ApexDeploymentException on messaging errors + */ + public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException { + final StopEngine stopEngineMessage = new StopEngine(engineKey); + LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port); + client.sendMessage(stopEngineMessage); + LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(stopEngineMessage); + if (response.isSuccessful()) { + LOGGER.debug(response.toString()); + } else { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexDeploymentException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + } + + /** + * Start periodic events on an Apex engine on the engine service. + * + * @param engineKey the key of the engine to start periodic events on + * @param period the period in milliseconds between periodic events + * @throws ApexDeploymentException on messaging errors + */ + public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException { + final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey); + startPerioidicEventsMessage.setMessageData(Long.toString(period)); + LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName, + port); + client.sendMessage(startPerioidicEventsMessage); + LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(startPerioidicEventsMessage); + if (response.isSuccessful()) { + LOGGER.debug(response.toString()); + } else { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexDeploymentException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + } + + /** + * Stop periodic events on an Apex engine on the engine service. + * + * @param engineKey the key of the engine to stop periodic events on + * @throws ApexDeploymentException on messaging errors + */ + public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException { + final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey); + LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName, + port); + client.sendMessage(stopPerioidicEventsMessage); + LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(stopPerioidicEventsMessage); + if (response.isSuccessful()) { + LOGGER.debug(response.toString()); + } else { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexDeploymentException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + } + + /** + * Get the status of an Apex engine. + * + * @param engineKey the key of the engine to get the status of + * @return an engine model containing the status of the engine for the given key + * @throws ApexException the apex exception + */ + public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException { + final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey); + LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port); + client.sendMessage(engineStatusMessage); + LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(engineStatusMessage); + if (!response.isSuccessful()) { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + + final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes()); + final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class); + modelReader.setValidateFlag(false); + return modelReader.read(baInputStream); + } + + /** + * Get the runtime information of an Apex engine. + * + * @param engineKey the key of the engine to get information for + * @return an engine model containing information on the engine for the given key + * @throws ApexException the apex exception + */ + public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException { + final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey); + LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName, + port); + client.sendMessage(engineInfoMessage); + LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port); + + // Check if we got a response + final Response response = getResponse(engineInfoMessage); + if (!response.isSuccessful()) { + LOGGER.warn("failed response {} received from server {}:{}", response.getMessageData(), hostName, port); + throw new ApexException( + "failed response " + response.getMessageData() + " received from server" + hostName + ':' + port); + } + + return response.getMessageData(); + } + + /** + * Check the response to a model deployment message from the Apex server. + * + * @param sentMessage the sent message + * @return the response message + * @throws ApexDeploymentException the apex deployment exception + */ + private Response getResponse(final Message sentMessage) throws ApexDeploymentException { + // Get the amount of milliseconds we should wait for a timeout + int timeoutTime = sentMessage.getReplyTimeout(); + if (timeoutTime <= 0) { + timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT; + } + + // Wait for the required amount of milliseconds for the response from the Apex server + Message receivedMessage = null; + for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar += + REPLY_MESSAGE_TIMEOUT_INCREMENT) { + try { + receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e); + throw new ApexDeploymentException( + "reception of response from server interrupted " + hostName + ':' + port, e); + } + } + + // Check if response to sent message + if (receivedMessage == null) { + LOGGER.warn("no response received to sent message " + sentMessage.getAction()); + throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction()); + } + + // Check instance is a response message + if (!(receivedMessage instanceof Response)) { + LOGGER.warn("response received from server is of incorrect type {}, should be of type {}", + receivedMessage.getClass().getName(), Response.class.getName()); + throw new ApexDeploymentException("response received from server is of incorrect type " + + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName()); + } + + // Cast the response message + final Response responseMessage = (Response) receivedMessage; + + // Check if response to sent message + if (!responseMessage.getResponseTo().equals(sentMessage)) { + LOGGER.warn("response received is not response to sent message " + sentMessage.getAction()); + throw new ApexDeploymentException( + "response received is not correct response to sent message " + sentMessage.getAction()); + } + + // Check if successful + if (responseMessage.isSuccessful()) { + LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(), + responseMessage.getMessageData()); + } else { + LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(), + responseMessage.getMessageData()); + } + + return responseMessage; + } +} diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java new file mode 100644 index 000000000..bfaece4c6 --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java @@ -0,0 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import java.util.Arrays; + +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This utility class is used to start and stop periodic events on Apex engines over the EngDep protocol. + */ +public class PeriodicEventManager { + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class); + + private static final int NUM_ARGUMENTS = 4; + private static final int PERIODIC_EVENT_INTERVAL = 3; + + // The facade that is handling messaging to the engine service + private EngineServiceFacade engineServiceFacade = null; + + /** + * The main method, reads the Apex server host address, port and location of the Apex model XML file from the + * command line arguments. + * + * @param args the arguments that specify the Apex engine and the Apex model file + */ + public static void main(final String[] args) { + if (args.length != NUM_ARGUMENTS) { + LOGGER.error("invalid arguments: " + Arrays.toString(args)); + LOGGER.error("usage: Deployer <server address> <port address> <start/stop> <periods in ms>"); + return; + } + + PeriodicEventManager deployer = null; + try { + // Use a Deployer object to handle model deployment + deployer = new PeriodicEventManager(args[0], Integer.parseInt(args[1])); + deployer.init(); + if (args[2].equalsIgnoreCase("start")) { + deployer.startPerioidicEvents(Long.parseLong(args[PERIODIC_EVENT_INTERVAL])); + } else { + deployer.stopPerioidicEvents(); + } + } catch (final ApexException e) { + LOGGER.error("model deployment failed on parameters {}", args, e); + } finally { + if (deployer != null) { + deployer.close(); + } + } + } + + /** + * Instantiates a new deployer. + * + * @param hostName the host name of the host running the Apex Engine + * @param port the port to use for EngDep communication with the Apex engine + */ + public PeriodicEventManager(final String hostName, final int port) { + engineServiceFacade = new EngineServiceFacade(hostName, port); + } + + /** + * Initializes the deployer, opens an EngDep communication session with the Apex engine. + * + * @throws ApexDeploymentException thrown on deployment and communication errors + */ + public void init() throws ApexDeploymentException { + engineServiceFacade.init(); + } + + /** + * Close the EngDep connection to the Apex server. + */ + public void close() { + engineServiceFacade.close(); + } + + /** + * Start the Apex engines on the engine service. + * + * @param period the interval in milliseconds between periodic events + * @throws ApexDeploymentException on messaging errors + */ + public void startPerioidicEvents(final long period) throws ApexDeploymentException { + for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { + engineServiceFacade.startPerioidicEvents(engineKey, period); + } + } + + /** + * Stop the Apex engines on the engine service. + * + * @throws ApexDeploymentException on messaging errors + */ + public void stopPerioidicEvents() throws ApexDeploymentException { + for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { + engineServiceFacade.stopPerioidicEvents(engineKey); + } + } +} diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java new file mode 100644 index 000000000..b2b7fda2d --- /dev/null +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/package-info.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides a facade and client that allows Apex engines to be managed and monitored over the EngDep protocol. Some + * utility classes for deployment are also provided. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.deployment; |