diff options
author | liamfallon <liam.fallon@ericsson.com> | 2018-09-27 22:42:56 +0100 |
---|---|---|
committer | liamfallon <liam.fallon@ericsson.com> | 2018-09-27 22:43:05 +0100 |
commit | d67a4dbff408ebf73a25158c85169fb7165e357e (patch) | |
tree | 8258809f1cf086c02edbc7f321ddd0d9d4218d47 /core/core-deployment/src/main/java/org | |
parent | afd3a44e210de3c1aed9f6f7278913b2a2f2f6d2 (diff) |
Unit test for core deployment
This module was not covered by unit test.
Issue-ID: POLICY-1034
Change-Id: I700d43c0bde58c2b236f46994f380e20116418d9
Signed-off-by: liamfallon <liam.fallon@ericsson.com>
Diffstat (limited to 'core/core-deployment/src/main/java/org')
2 files changed, 99 insertions, 60 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java index d0af94930..0df0166df 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java @@ -25,6 +25,7 @@ import com.google.common.eventbus.Subscribe; import java.net.URI; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; @@ -38,10 +39,9 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class DeploymentClient handles the client side of an EngDep communication session with an - * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending - * queue to queue messages for sending by the client thread and a receiving queue to queue messages - * received from the Apex engine. + * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a + * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the + * client thread and a receiving queue to queue messages received from the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -49,6 +49,7 @@ public class DeploymentClient implements Runnable { private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class); private static final int CLIENT_STOP_WAIT_INTERVAL = 100; + private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50; // Host and port to use for EngDep messaging private String host = null; @@ -65,6 +66,10 @@ public class DeploymentClient implements Runnable { // Thread management fields private boolean started = false; private Thread thisThread = null; + + // Number of messages processed + private long messagesSent = 0; + private long messagesReceived = 0; /** * Instantiates a new deployment client. @@ -104,10 +109,18 @@ public class DeploymentClient implements Runnable { return; } // Loop forever, sending messages as they appear on the queue - while (started) { + while (started && !thisThread.isInterrupted()) { try { - final Message messageForSending = sendQueue.take(); - sendMessage(messageForSending); + final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS); + if (messageForSending == null) { + continue; + } + + // Send the message in its message holder + final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost()); + messageHolder.addMessage(messageForSending); + service.send(messageHolder); + messagesSent++; } catch (final InterruptedException e) { // Message sending has been interrupted, we are finished LOGGER.debug("engine<-->deployment client interrupted"); @@ -128,11 +141,7 @@ public class DeploymentClient implements Runnable { * @param message the message to send to the Apex server */ public void sendMessage(final Message message) { - final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost()); - - // Send the message in its message holder - messageHolder.addMessage(message); - service.send(messageHolder); + sendQueue.add(message); } /** @@ -146,7 +155,9 @@ public class DeploymentClient implements Runnable { ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL); // Close the Web Services connection - service.stopConnection(); + if (service != null) { + service.stopConnection(); + } started = false; LOGGER.debug("engine<-->deployment test client stopped . . ."); } @@ -170,11 +181,26 @@ public class DeploymentClient implements Runnable { } /** - * The listener interface for receiving deploymentClient events. The class that is interested in - * processing a deploymentClient event implements this interface, and the object created with - * that class is registered with a component using the component's - * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that - * object's appropriate method is invoked. + * Get the number of messages received by the client. + * @return the number of messages received by the client + */ + public long getMessagesReceived() { + return messagesReceived; + } + + /** + * Get the number of messages sent by the client. + * @return the number of messages sent by the client + */ + public long getMessagesSent() { + return messagesSent; + } + + /** + * The listener interface for receiving deploymentClient events. The class that is interested in processing a + * deploymentClient event implements this interface, and the object created with that class is registered with a + * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event + * occurs, that object's appropriate method is invoked. * * @see DeploymentClientEvent */ @@ -182,25 +208,24 @@ public class DeploymentClient implements Runnable { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. - * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock) + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. policy.apex.core. + * infrastructure.messaging.impl.ws.messageblock. MessageBlock) */ @Subscribe @Override public void onMessage(final MessageBlock<Message> messageData) { + messagesReceived++; receiveQueue.addAll(messageData.getMessages()); } /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. - * String) + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String) */ @Override public void onMessage(final String messageString) { + messagesReceived++; throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol"); } } diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java index 5f0752aa0..533db0b17 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.core.deployment; +import java.io.PrintStream; import java.util.Arrays; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; @@ -35,19 +36,39 @@ public class PeriodicEventManager { private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class); private static final int NUM_ARGUMENTS = 4; - private static final int PERIODIC_EVENT_INTERVAL = 3; // The facade that is handling messaging to the engine service private EngineServiceFacade engineServiceFacade = null; + // Host name and port of the Apex service + private String hostName; + private int port; + + // Should we start or stop periodic events + private boolean startFlag; + + // The period for periodic events + private long period; + /** - * Instantiates a new deployer. - * - * @param hostName the host name of the host running the Apex Engine - * @param port the port to use for EngDep communication with the Apex engine + * Instantiates a new periodic event manager. + * + * @param args the command parameters + * @param outputStream the output stream */ - public PeriodicEventManager(final String hostName, final int port) { - engineServiceFacade = new EngineServiceFacade(hostName, port); + public PeriodicEventManager(final String[] args, final PrintStream outputStream) { + if (args.length != NUM_ARGUMENTS) { + String message = "invalid arguments: " + Arrays.toString(args) + + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>"; + LOGGER.error(message); + outputStream.println(message); + return; + } + + this.hostName = args[0]; + this.port = Integer.parseInt(args[1]); + this.startFlag = "start".equalsIgnoreCase(args[2]); + this.period = Long.parseLong(args[3]); } /** @@ -56,13 +77,27 @@ public class PeriodicEventManager { * @throws ApexDeploymentException thrown on deployment and communication errors */ public void init() throws ApexDeploymentException { - engineServiceFacade.init(); + try { + // Use an engine service facade to handle model deployment + engineServiceFacade = new EngineServiceFacade(hostName, port); + engineServiceFacade.init(); + + if (startFlag) { + startPerioidicEvents(period); + } else { + stopPerioidicEvents(); + } + } catch (final ApexException e) { + LOGGER.error("model deployment failed on parameters {} {} {}", hostName, port, startFlag, e); + } finally { + close(); + } } /** * Close the EngDep connection to the Apex server. */ - public void close() { + private void close() { engineServiceFacade.close(); } @@ -72,7 +107,7 @@ public class PeriodicEventManager { * @param period the interval in milliseconds between periodic events * @throws ApexDeploymentException on messaging errors */ - public void startPerioidicEvents(final long period) throws ApexDeploymentException { + private void startPerioidicEvents(final long period) throws ApexDeploymentException { for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { engineServiceFacade.startPerioidicEvents(engineKey, period); } @@ -83,7 +118,7 @@ public class PeriodicEventManager { * * @throws ApexDeploymentException on messaging errors */ - public void stopPerioidicEvents() throws ApexDeploymentException { + private void stopPerioidicEvents() throws ApexDeploymentException { for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { engineServiceFacade.stopPerioidicEvents(engineKey); } @@ -94,31 +129,10 @@ public class PeriodicEventManager { * command line arguments. * * @param args the arguments that specify the Apex engine and the Apex model file + * @throws ApexDeploymentException on deployment errors */ - public static void main(final String[] args) { - if (args.length != NUM_ARGUMENTS) { - String message = "invalid arguments: " + Arrays.toString(args) - + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>"; - LOGGER.error(message); - return; - } - - PeriodicEventManager deployer = null; - try { - // Use a Deployer object to handle model deployment - deployer = new PeriodicEventManager(args[0], Integer.parseInt(args[1])); - deployer.init(); - if ("start".equalsIgnoreCase(args[2])) { - deployer.startPerioidicEvents(Long.parseLong(args[PERIODIC_EVENT_INTERVAL])); - } else { - deployer.stopPerioidicEvents(); - } - } catch (final ApexException e) { - LOGGER.error("model deployment failed on parameters {}", args, e); - } finally { - if (deployer != null) { - deployer.close(); - } - } + public static void main(final String[] args) throws ApexDeploymentException { + PeriodicEventManager peManager = new PeriodicEventManager(args, System.out); + peManager.init(); } } |