diff options
Diffstat (limited to 'core/core-deployment/src')
6 files changed, 471 insertions, 60 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java index d0af94930..0df0166df 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java @@ -25,6 +25,7 @@ import com.google.common.eventbus.Subscribe; import java.net.URI; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; @@ -38,10 +39,9 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class DeploymentClient handles the client side of an EngDep communication session with an - * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending - * queue to queue messages for sending by the client thread and a receiving queue to queue messages - * received from the Apex engine. + * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a + * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the + * client thread and a receiving queue to queue messages received from the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -49,6 +49,7 @@ public class DeploymentClient implements Runnable { private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class); private static final int CLIENT_STOP_WAIT_INTERVAL = 100; + private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50; // Host and port to use for EngDep messaging private String host = null; @@ -65,6 +66,10 @@ public class DeploymentClient implements Runnable { // Thread management fields private boolean started = false; private Thread thisThread = null; + + // Number of messages processed + private long messagesSent = 0; + private long messagesReceived = 0; /** * Instantiates a new deployment client. @@ -104,10 +109,18 @@ public class DeploymentClient implements Runnable { return; } // Loop forever, sending messages as they appear on the queue - while (started) { + while (started && !thisThread.isInterrupted()) { try { - final Message messageForSending = sendQueue.take(); - sendMessage(messageForSending); + final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS); + if (messageForSending == null) { + continue; + } + + // Send the message in its message holder + final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost()); + messageHolder.addMessage(messageForSending); + service.send(messageHolder); + messagesSent++; } catch (final InterruptedException e) { // Message sending has been interrupted, we are finished LOGGER.debug("engine<-->deployment client interrupted"); @@ -128,11 +141,7 @@ public class DeploymentClient implements Runnable { * @param message the message to send to the Apex server */ public void sendMessage(final Message message) { - final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost()); - - // Send the message in its message holder - messageHolder.addMessage(message); - service.send(messageHolder); + sendQueue.add(message); } /** @@ -146,7 +155,9 @@ public class DeploymentClient implements Runnable { ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL); // Close the Web Services connection - service.stopConnection(); + if (service != null) { + service.stopConnection(); + } started = false; LOGGER.debug("engine<-->deployment test client stopped . . ."); } @@ -170,11 +181,26 @@ public class DeploymentClient implements Runnable { } /** - * The listener interface for receiving deploymentClient events. The class that is interested in - * processing a deploymentClient event implements this interface, and the object created with - * that class is registered with a component using the component's - * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that - * object's appropriate method is invoked. + * Get the number of messages received by the client. + * @return the number of messages received by the client + */ + public long getMessagesReceived() { + return messagesReceived; + } + + /** + * Get the number of messages sent by the client. + * @return the number of messages sent by the client + */ + public long getMessagesSent() { + return messagesSent; + } + + /** + * The listener interface for receiving deploymentClient events. The class that is interested in processing a + * deploymentClient event implements this interface, and the object created with that class is registered with a + * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event + * occurs, that object's appropriate method is invoked. * * @see DeploymentClientEvent */ @@ -182,25 +208,24 @@ public class DeploymentClient implements Runnable { /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. - * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock) + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. policy.apex.core. + * infrastructure.messaging.impl.ws.messageblock. MessageBlock) */ @Subscribe @Override public void onMessage(final MessageBlock<Message> messageData) { + messagesReceived++; receiveQueue.addAll(messageData.getMessages()); } /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. - * String) + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String) */ @Override public void onMessage(final String messageString) { + messagesReceived++; throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol"); } } diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java index 5f0752aa0..533db0b17 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/PeriodicEventManager.java @@ -20,6 +20,7 @@ package org.onap.policy.apex.core.deployment; +import java.io.PrintStream; import java.util.Arrays; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; @@ -35,19 +36,39 @@ public class PeriodicEventManager { private static final XLogger LOGGER = XLoggerFactory.getXLogger(BatchDeployer.class); private static final int NUM_ARGUMENTS = 4; - private static final int PERIODIC_EVENT_INTERVAL = 3; // The facade that is handling messaging to the engine service private EngineServiceFacade engineServiceFacade = null; + // Host name and port of the Apex service + private String hostName; + private int port; + + // Should we start or stop periodic events + private boolean startFlag; + + // The period for periodic events + private long period; + /** - * Instantiates a new deployer. - * - * @param hostName the host name of the host running the Apex Engine - * @param port the port to use for EngDep communication with the Apex engine + * Instantiates a new periodic event manager. + * + * @param args the command parameters + * @param outputStream the output stream */ - public PeriodicEventManager(final String hostName, final int port) { - engineServiceFacade = new EngineServiceFacade(hostName, port); + public PeriodicEventManager(final String[] args, final PrintStream outputStream) { + if (args.length != NUM_ARGUMENTS) { + String message = "invalid arguments: " + Arrays.toString(args) + + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>"; + LOGGER.error(message); + outputStream.println(message); + return; + } + + this.hostName = args[0]; + this.port = Integer.parseInt(args[1]); + this.startFlag = "start".equalsIgnoreCase(args[2]); + this.period = Long.parseLong(args[3]); } /** @@ -56,13 +77,27 @@ public class PeriodicEventManager { * @throws ApexDeploymentException thrown on deployment and communication errors */ public void init() throws ApexDeploymentException { - engineServiceFacade.init(); + try { + // Use an engine service facade to handle model deployment + engineServiceFacade = new EngineServiceFacade(hostName, port); + engineServiceFacade.init(); + + if (startFlag) { + startPerioidicEvents(period); + } else { + stopPerioidicEvents(); + } + } catch (final ApexException e) { + LOGGER.error("model deployment failed on parameters {} {} {}", hostName, port, startFlag, e); + } finally { + close(); + } } /** * Close the EngDep connection to the Apex server. */ - public void close() { + private void close() { engineServiceFacade.close(); } @@ -72,7 +107,7 @@ public class PeriodicEventManager { * @param period the interval in milliseconds between periodic events * @throws ApexDeploymentException on messaging errors */ - public void startPerioidicEvents(final long period) throws ApexDeploymentException { + private void startPerioidicEvents(final long period) throws ApexDeploymentException { for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { engineServiceFacade.startPerioidicEvents(engineKey, period); } @@ -83,7 +118,7 @@ public class PeriodicEventManager { * * @throws ApexDeploymentException on messaging errors */ - public void stopPerioidicEvents() throws ApexDeploymentException { + private void stopPerioidicEvents() throws ApexDeploymentException { for (final AxArtifactKey engineKey : engineServiceFacade.getEngineKeyArray()) { engineServiceFacade.stopPerioidicEvents(engineKey); } @@ -94,31 +129,10 @@ public class PeriodicEventManager { * command line arguments. * * @param args the arguments that specify the Apex engine and the Apex model file + * @throws ApexDeploymentException on deployment errors */ - public static void main(final String[] args) { - if (args.length != NUM_ARGUMENTS) { - String message = "invalid arguments: " + Arrays.toString(args) - + "\nusage: Deployer <server address> <port address> <start/stop> <periods in ms>"; - LOGGER.error(message); - return; - } - - PeriodicEventManager deployer = null; - try { - // Use a Deployer object to handle model deployment - deployer = new PeriodicEventManager(args[0], Integer.parseInt(args[1])); - deployer.init(); - if ("start".equalsIgnoreCase(args[2])) { - deployer.startPerioidicEvents(Long.parseLong(args[PERIODIC_EVENT_INTERVAL])); - } else { - deployer.stopPerioidicEvents(); - } - } catch (final ApexException e) { - LOGGER.error("model deployment failed on parameters {}", args, e); - } finally { - if (deployer != null) { - deployer.close(); - } - } + public static void main(final String[] args) throws ApexDeploymentException { + PeriodicEventManager peManager = new PeriodicEventManager(args, System.out); + peManager.init(); } } diff --git a/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/ApexDeploymentExceptionTest.java b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/ApexDeploymentExceptionTest.java new file mode 100644 index 000000000..d87c6caa5 --- /dev/null +++ b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/ApexDeploymentExceptionTest.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.junit.Test; + +/** + * Test the Apex deployment Exception. + * + */ +public class ApexDeploymentExceptionTest { + + @Test + public void testDeploymentException() { + ApexDeploymentException ade0 = new ApexDeploymentException("a message"); + assertNotNull(ade0); + assertEquals("a message", ade0.getMessage()); + + ApexDeploymentException ade1 = new ApexDeploymentException("a message", new IOException()); + assertNotNull(ade1); + assertEquals("a message", ade0.getMessage()); + } +} diff --git a/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/DeploymentClientTest.java b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/DeploymentClientTest.java new file mode 100644 index 000000000..b12e526d7 --- /dev/null +++ b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/DeploymentClientTest.java @@ -0,0 +1,139 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.core.protocols.Message; +import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus; +import org.onap.policy.apex.core.protocols.engdep.messages.Response; +import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; + +/** + * Test the deployment web socket client. + */ +@RunWith(MockitoJUnitRunner.class) +public class DeploymentClientTest { + @Mock + private static MessagingServiceFactory<Message> mockServiceFactory; + + @Mock + private static MessagingService<Message> mockService; + + @SuppressWarnings("rawtypes") + ArgumentCaptor<MessageListener> messageListener = ArgumentCaptor.forClass(MessageListener.class); + + @SuppressWarnings("unchecked") + @Test + public void testDeploymentClientStart() throws Exception { + DeploymentClient deploymentClient = new DeploymentClient("localhost", 51273); + + final Field factoryField = deploymentClient.getClass().getDeclaredField("factory"); + factoryField.setAccessible(true); + factoryField.set(deploymentClient, mockServiceFactory); + + Mockito.doReturn(mockService).when(mockServiceFactory).createClient(anyObject()); + + Mockito.doNothing().when(mockService).addMessageListener(messageListener.capture()); + Mockito.doNothing().when(mockService).startConnection(); + + Mockito.doNothing().when(mockService).send((MessageHolder<Message>) anyObject()); + + Thread clientThread = new Thread(deploymentClient); + clientThread.start(); + + ThreadUtilities.sleep(20); + + assertTrue(deploymentClient.isStarted()); + assertTrue(clientThread.isAlive()); + + AxArtifactKey engineKey = new AxArtifactKey("MyEngine", "0.0.1"); + GetEngineStatus getEngineStatus = new GetEngineStatus(engineKey); + deploymentClient.sendMessage(new GetEngineStatus(engineKey)); + + ThreadUtilities.sleep(20); + Response response = new Response(engineKey, true, getEngineStatus); + List<Message> messageList = new ArrayList<>(); + messageList.add(response); + + MessageBlock<Message> responseBlock = new MessageBlock<>(messageList, null); + messageListener.getValue().onMessage(responseBlock); + + try { + messageListener.getValue().onMessage("StringMessage"); + fail("test should throw an exception here"); + } catch (UnsupportedOperationException use) { + assertEquals("String mesages are not supported on the EngDep protocol", use.getMessage()); + } + + ThreadUtilities.sleep(300); + assertEquals(1, deploymentClient.getMessagesSent()); + assertEquals(2, deploymentClient.getMessagesReceived()); + + deploymentClient.stopClient(); + } + + @Test + public void testDeploymentClientStartException() throws Exception { + DeploymentClient depoymentClient = new DeploymentClient("localhost", 51273); + + final Field factoryField = depoymentClient.getClass().getDeclaredField("factory"); + factoryField.setAccessible(true); + factoryField.set(depoymentClient, mockServiceFactory); + + Mockito.doReturn(mockService).when(mockServiceFactory).createClient(anyObject()); + + Mockito.doNothing().when(mockService).addMessageListener(anyObject()); + Mockito.doThrow(new ApexRuntimeException("connection start failed")).when(mockService).startConnection(); + + Thread clientThread = new Thread(depoymentClient); + clientThread.start(); + + ThreadUtilities.sleep(50); + + assertFalse(depoymentClient.isStarted()); + assertFalse(clientThread.isAlive()); + assertEquals(0, depoymentClient.getReceiveQueue().size()); + + ThreadUtilities.sleep(100); + } +} diff --git a/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/PeriodicEventManagerTest.java b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/PeriodicEventManagerTest.java new file mode 100644 index 000000000..fb204c3a3 --- /dev/null +++ b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/PeriodicEventManagerTest.java @@ -0,0 +1,145 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.PrintStream; + +import org.junit.Test; + +/** + * Test the periodic event manager utility. + */ +public class PeriodicEventManagerTest { + @Test + public void testPeroidicEventManager() { + try { + final String[] EventArgs = + { "-h" }; + + PeriodicEventManager.main(EventArgs); + } catch (Exception exc) { + fail("test should not throw an exception"); + } + } + + @Test + public void testPeroidicEventManagerNoOptions() { + final String[] EventArgs = new String[] + {}; + + final String outputString = runPeriodicEventManager(EventArgs); + + assertTrue(outputString + .contains("usage: Deployer <server address> <port address> <start/stop> <periods in ms>")); + } + + @Test + public void testPeroidicEventManagerBadOptions() { + final String[] EventArgs = + { "-zabbu" }; + + final String outputString = runPeriodicEventManager(EventArgs); + + assertTrue(outputString + .contains("usage: Deployer <server address> <port address> <start/stop> <periods in ms>")); + } + + @Test + public void testPeroidicEventManagerNonNumeric3() { + final String[] EventArgs = + { "aaa", "bbb", "ccc", "ddd" }; + + try { + runPeriodicEventManager(EventArgs); + } catch (NumberFormatException nfe) { + assertEquals("For input string: \"bbb\"", nfe.getMessage()); + } + } + + @Test + public void testPeroidicEventManagerNonNumeric2() { + final String[] EventArgs = + { "aaa", "12345", "ccc", "1000" }; + + try { + runPeriodicEventManager(EventArgs); + } catch (NumberFormatException nfe) { + assertEquals("For input string: \"ddd\"", nfe.getMessage()); + } + } + + @Test + public void testPeroidicEventManagerStart() { + final String[] EventArgs = + { "localhost", "12345", "start", "1000" }; + + final String outputString = runPeriodicEventManager(EventArgs); + + assertTrue(outputString.contains("\n*** StdErr ***\n\n*** exception ***")); + } + + + @Test + public void testPeroidicEventManagerStop() { + final String[] EventArgs = + { "localhost", "12345", "stop", "1000" }; + + final String outputString = runPeriodicEventManager(EventArgs); + + assertTrue(outputString.contains("\n*** StdErr ***\n\n*** exception ***")); + } + + /** + * Run the application. + * + * @param eventArgs the command arguments + * @return a string containing the command output + */ + private String runPeriodicEventManager(final String[] eventArgs) { + final ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + final ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + + PeriodicEventManager peManager = new PeriodicEventManager(eventArgs, new PrintStream(baosOut, true)); + + String exceptionString = ""; + try { + peManager.init(); + } catch (ApexDeploymentException ade) { + exceptionString = ade.getCascadedMessage(); + } + + InputStream testInput = new ByteArrayInputStream("Test Data for Input to WS".getBytes()); + System.setIn(testInput); + + String outString = baosOut.toString(); + String errString = baosErr.toString(); + + return "*** StdOut ***\n" + outString + "\n*** StdErr ***\n" + errString + "\n*** exception ***\n" + + exceptionString; + } +} diff --git a/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/TestMessageListener.java b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/TestMessageListener.java new file mode 100644 index 000000000..8bb1c7e37 --- /dev/null +++ b/core/core-deployment/src/test/java/org/onap/policy/apex/core/deployment/TestMessageListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.deployment; + +import static org.junit.Assert.fail; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.onap.policy.apex.core.protocols.Message; + +/** + * A test message listener. + */ +public class TestMessageListener implements MessageListener<Message> { + @Override + public void onMessage(String messageString) { + fail("Message should not be received"); + } + + @Override + public void onMessage(MessageBlock<Message> data) { + fail("Message should not be received"); + } +} |