summaryrefslogtreecommitdiffstats
path: root/core/core-deployment/src/main/java/org
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@ericsson.com>2018-09-27 22:42:56 +0100
committerliamfallon <liam.fallon@ericsson.com>2018-09-27 22:43:05 +0100
commitd67a4dbff408ebf73a25158c85169fb7165e357e (patch)
tree8258809f1cf086c02edbc7f321ddd0d9d4218d47 /core/core-deployment/src/main/java/org
parentafd3a44e210de3c1aed9f6f7278913b2a2f2f6d2 (diff)
Unit test for core deployment
This module was not covered by unit test. Issue-ID: POLICY-1034 Change-Id: I700d43c0bde58c2b236f46994f380e20116418d9 Signed-off-by: liamfallon <liam.fallon@ericsson.com>
Diffstat (limited to 'core/core-deployment/src/main/java/org')
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java73
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java86
2 files changed, 99 insertions, 60 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
index d0af94930..0df0166df 100644
--- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
@@ -25,6 +25,7 @@ import com.google.common.eventbus.Subscribe;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
@@ -38,10 +39,9 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class DeploymentClient handles the client side of an EngDep communication session with an
- * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending
- * queue to queue messages for sending by the client thread and a receiving queue to queue messages
- * received from the Apex engine.
+ * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
+ * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
+ * client thread and a receiving queue to queue messages received from the Apex engine.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -49,6 +49,7 @@ public class DeploymentClient implements Runnable {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
+ private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50;
// Host and port to use for EngDep messaging
private String host = null;
@@ -65,6 +66,10 @@ public class DeploymentClient implements Runnable {
// Thread management fields
private boolean started = false;
private Thread thisThread = null;
+
+ // Number of messages processed
+ private long messagesSent = 0;
+ private long messagesReceived = 0;
/**
* Instantiates a new deployment client.
@@ -104,10 +109,18 @@ public class DeploymentClient implements Runnable {
return;
}
// Loop forever, sending messages as they appear on the queue
- while (started) {
+ while (started && !thisThread.isInterrupted()) {
try {
- final Message messageForSending = sendQueue.take();
- sendMessage(messageForSending);
+ final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (messageForSending == null) {
+ continue;
+ }
+
+ // Send the message in its message holder
+ final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
+ messageHolder.addMessage(messageForSending);
+ service.send(messageHolder);
+ messagesSent++;
} catch (final InterruptedException e) {
// Message sending has been interrupted, we are finished
LOGGER.debug("engine<-->deployment client interrupted");
@@ -128,11 +141,7 @@ public class DeploymentClient implements Runnable {
* @param message the message to send to the Apex server
*/
public void sendMessage(final Message message) {
- final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
-
- // Send the message in its message holder
- messageHolder.addMessage(message);
- service.send(messageHolder);
+ sendQueue.add(message);
}
/**
@@ -146,7 +155,9 @@ public class DeploymentClient implements Runnable {
ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
// Close the Web Services connection
- service.stopConnection();
+ if (service != null) {
+ service.stopConnection();
+ }
started = false;
LOGGER.debug("engine<-->deployment test client stopped . . .");
}
@@ -170,11 +181,26 @@ public class DeploymentClient implements Runnable {
}
/**
- * The listener interface for receiving deploymentClient events. The class that is interested in
- * processing a deploymentClient event implements this interface, and the object created with
- * that class is registered with a component using the component's
- * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that
- * object's appropriate method is invoked.
+ * Get the number of messages received by the client.
+ * @return the number of messages received by the client
+ */
+ public long getMessagesReceived() {
+ return messagesReceived;
+ }
+
+ /**
+ * Get the number of messages sent by the client.
+ * @return the number of messages sent by the client
+ */
+ public long getMessagesSent() {
+ return messagesSent;
+ }
+
+ /**
+ * The listener interface for receiving deploymentClient events. The class that is interested in processing a
+ * deploymentClient event implements this interface, and the object created with that class is registered with a
+ * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
+ * occurs, that object's appropriate method is invoked.
*
* @see DeploymentClientEvent
*/
@@ -182,25 +208,24 @@ public class DeploymentClient implements Runnable {
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.
- * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. policy.apex.core.
+ * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
*/
@Subscribe
@Override
public void onMessage(final MessageBlock<Message> messageData) {
+ messagesReceived++;
receiveQueue.addAll(messageData.getMessages());
}
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
- * String)
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String)
*/
@Override
public void onMessage(final String messageString) {
+ messagesReceived++;
throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");
}
}
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java
index 5f0752aa0..533db0b17 100644
--- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java
@@ -20,6 +20,7 @@
package org.onap.policy.apex.core.deployment;
+import java.io.PrintStream;
import java.util.Arrays;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
@@ -35,19 +36,39 @@ public class PeriodicEventManager {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class);
private static final int NUM_ARGUMENTS = 4;
- private static final int PERIODIC_EVENT_INTERVAL = 3;
// The facade that is handling messaging to the engine service
private EngineServiceFacade engineServiceFacade = null;
+ // Host name and port of the Apex service
+ private String hostName;
+ private int port;
+
+ // Should we start or stop periodic events
+ private boolean startFlag;
+
+ // The period for periodic events
+ private long period;
+
/**
- * Instantiates a new deployer.
- *
- * @param hostName the host name of the host running the Apex Engine
- * @param port the port to use for EngDep communication with the Apex engine
+ * Instantiates a new periodic event manager.
+ *
+ * @param args the command parameters
+ * @param outputStream the output stream
*/
- public PeriodicEventManager(final String hostName, final int port) {
- engineServiceFacade = new EngineServiceFacade(hostName, port);
+ public PeriodicEventManager(final String[] args, final PrintStream outputStream) {
+ if (args.length != NUM_ARGUMENTS) {
+ String message = "invalid arguments: " + Arrays.toString(args)
+ + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>";
+ LOGGER.error(message);
+ outputStream.println(message);
+ return;
+ }
+
+ this.hostName = args[0];
+ this.port = Integer.parseInt(args[1]);
+ this.startFlag = "start".equalsIgnoreCase(args[2]);
+ this.period = Long.parseLong(args[3]);
}
/**
@@ -56,13 +77,27 @@ public class PeriodicEventManager {
* @throws ApexDeploymentException thrown on deployment and communication errors
*/
public void init() throws ApexDeploymentException {
- engineServiceFacade.init();
+ try {
+ // Use an engine service facade to handle model deployment
+ engineServiceFacade = new EngineServiceFacade(hostName, port);
+ engineServiceFacade.init();
+
+ if (startFlag) {
+ startPerioidicEvents(period);
+ } else {
+ stopPerioidicEvents();
+ }
+ } catch (final ApexException e) {
+ LOGGER.error("model deployment failed on parameters {} {} {}", hostName, port, startFlag, e);
+ } finally {
+ close();
+ }
}
/**
* Close the EngDep connection to the Apex server.
*/
- public void close() {
+ private void close() {
engineServiceFacade.close();
}
@@ -72,7 +107,7 @@ public class PeriodicEventManager {
* @param period the interval in milliseconds between periodic events
* @throws ApexDeploymentException on messaging errors
*/
- public void startPerioidicEvents(final long period) throws ApexDeploymentException {
+ private void startPerioidicEvents(final long period) throws ApexDeploymentException {
for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
engineServiceFacade.startPerioidicEvents(engineKey, period);
}
@@ -83,7 +118,7 @@ public class PeriodicEventManager {
*
* @throws ApexDeploymentException on messaging errors
*/
- public void stopPerioidicEvents() throws ApexDeploymentException {
+ private void stopPerioidicEvents() throws ApexDeploymentException {
for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) {
engineServiceFacade.stopPerioidicEvents(engineKey);
}
@@ -94,31 +129,10 @@ public class PeriodicEventManager {
* command line arguments.
*
* @param args the arguments that specify the Apex engine and the Apex model file
+ * @throws ApexDeploymentException on deployment errors
*/
- public static void main(final String[] args) {
- if (args.length != NUM_ARGUMENTS) {
- String message = "invalid arguments: " + Arrays.toString(args)
- + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>";
- LOGGER.error(message);
- return;
- }
-
- PeriodicEventManager deployer = null;
- try {
- // Use a Deployer object to handle model deployment
- deployer = new PeriodicEventManager(args[0], Integer.parseInt(args[1]));
- deployer.init();
- if ("start".equalsIgnoreCase(args[2])) {
- deployer.startPerioidicEvents(Long.parseLong(args[PERIODIC_EVENT_INTERVAL]));
- } else {
- deployer.stopPerioidicEvents();
- }
- } catch (final ApexException e) {
- LOGGER.error("model deployment failed on parameters {}", args, e);
- } finally {
- if (deployer != null) {
- deployer.close();
- }
- }
+ public static void main(final String[] args) throws ApexDeploymentException {
+ PeriodicEventManager peManager = new PeriodicEventManager(args, System.out);
+ peManager.init();
}
}