diff options
5 files changed, 80 insertions, 75 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java index a96517510..878882d6b 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java @@ -196,7 +196,7 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn @Override public void run() { // JMS session and message consumer for receiving messages - try (Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { // Create a message consumer for reception of messages and set this class as a message listener createMessageConsumer(jmsSession); } catch (final Exception e) { @@ -209,20 +209,21 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: " + jmsConsumerProperties.getConsumerTopic()); } - // The endless loop that receives events over JMS - while (consumerThread.isAlive() && !stopOrderedFlag) { - ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); - } } /** * The helper function to create a message consumer from a given JMS session - * + * * @param jmsSession a JMS session */ - private void createMessageConsumer(Session jmsSession) { - try (MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) { + private void createMessageConsumer(final Session jmsSession) { + try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) { messageConsumer.setMessageListener(this); + + // The endless loop that receives events over JMS + while (consumerThread.isAlive() && !stopOrderedFlag) { + ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); + } } catch (final Exception e) { final String errorMessage = "failed to create a JMS message consumer for receiving messages"; LOGGER.warn(errorMessage, e); diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java index e5ddbfee5..8b9f2a112 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java @@ -5,15 +5,15 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -32,11 +32,15 @@ import org.apache.activemq.command.ActiveMQTopic; import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Liam Fallon (liam.fallon@ericsson.com) */ public class JMSEventProducer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventProducer.class); + private final String topic; private final int eventCount; private final boolean sendObjects; @@ -48,8 +52,9 @@ public class JMSEventProducer implements Runnable { private boolean stopFlag = false; private final Connection connection; - public JMSEventProducer(String topic, ConnectionFactory connectionFactory, String username, String password, - final int eventCount, final boolean sendObjects, final long eventInterval) throws JMSException { + public JMSEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username, + final String password, final int eventCount, final boolean sendObjects, final long eventInterval) + throws JMSException { this.topic = topic; this.eventCount = eventCount; this.sendObjects = sendObjects; @@ -63,10 +68,9 @@ public class JMSEventProducer implements Runnable { @Override public void run() { - try { - final Topic jmsTopic = new ActiveMQTopic(topic); - final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic); + final Topic jmsTopic = new ActiveMQTopic(topic); + try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) { while (producerThread.isAlive() && !stopFlag) { ThreadUtilities.sleep(50); @@ -77,8 +81,6 @@ public class JMSEventProducer implements Runnable { } } - jmsProducer.close(); - jmsSession.close(); } catch (final Exception e) { throw new ApexEventRuntimeException("JMS event consumption failed", e); } @@ -89,12 +91,11 @@ public class JMSEventProducer implements Runnable { } private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException { - System.out.println(JMSEventProducer.class.getCanonicalName() + ": sending events to JMS server, event count " - + eventCount); + + LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(), + eventCount); for (int i = 0; i < eventCount; i++) { - System.out.println(JMSEventProducer.class.getCanonicalName() + ": waiting " + eventInterval - + " milliseconds before sending next event"); ThreadUtilities.sleep(eventInterval); Message jmsMessage = null; @@ -105,9 +106,8 @@ public class JMSEventProducer implements Runnable { } jmsProducer.send(jmsMessage); eventsSentCount++; - System.out.println(JMSEventProducer.class.getCanonicalName() + ": sent event " + jmsMessage.toString()); } - System.out.println(JMSEventProducer.class.getCanonicalName() + ": completed"); + LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount); } public long getEventsSentCount() { @@ -115,15 +115,13 @@ public class JMSEventProducer implements Runnable { } public void shutdown() { - System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopping"); - + LOGGER.info("{} : stopping", this.getClass().getCanonicalName()); stopFlag = true; while (producerThread.isAlive()) { ThreadUtilities.sleep(10); } - - System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopped"); + LOGGER.info("{} : stopped", this.getClass().getCanonicalName()); } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java index 46455f5df..916216a5a 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java @@ -5,15 +5,15 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -34,11 +34,15 @@ import org.apache.activemq.command.ActiveMQTopic; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Liam Fallon (liam.fallon@ericsson.com) */ public class JMSEventSubscriber implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventSubscriber.class); + private final String topic; private long eventsReceivedCount = 0; @@ -58,13 +62,9 @@ public class JMSEventSubscriber implements Runnable { @Override public void run() { - try { - final Topic jmsTopic = new ActiveMQTopic(topic); - final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic); - - System.out.println(JMSEventSubscriber.class.getCanonicalName() - + ": receiving events from Kafka server on topic " + topic); + final Topic jmsTopic = new ActiveMQTopic(topic); + try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic);) { while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) { try { @@ -75,11 +75,9 @@ public class JMSEventSubscriber implements Runnable { if (message instanceof ObjectMessage) { final TestPing testPing = (TestPing) ((ObjectMessage) message).getObject(); - System.out.println("Received message: " + testPing.toString()); testPing.verify(); } else if (message instanceof TextMessage) { - final String textMessage = ((TextMessage) message).getText(); - System.out.println("Received message: " + textMessage); + ((TextMessage) message).getText(); } else { throw new ApexEventException("unknowm message \"" + message + "\" of type \"" + message.getClass().getCanonicalName() + "\" received"); @@ -90,13 +88,11 @@ public class JMSEventSubscriber implements Runnable { } } - jmsConsumer.close(); - jmsSession.close(); } catch (final Exception e) { throw new ApexEventRuntimeException("JMS event consumption failed", e); } - System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": event reception completed"); + LOGGER.info("{} : event reception completed", this.getClass().getCanonicalName()); } public long getEventsReceivedCount() { @@ -111,7 +107,7 @@ public class JMSEventSubscriber implements Runnable { } connection.close(); - System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": stopped"); + LOGGER.info("{} : stopped", this.getClass().getCanonicalName()); } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java index 36d689bda..de3c8d37b 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java @@ -5,21 +5,27 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.policy.apex.apps.uservice.test.adapt.jms; +import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.HOST; +import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_IN; +import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_OUT; +import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.PORT; +import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.connectionFactory; + import java.util.HashMap; import java.util.Hashtable; import java.util.Map; @@ -48,11 +54,11 @@ public class TestContext implements Context { testProperties = new Properties(); final Map<String, Object> params = new HashMap<String, Object>(); - params.put("host", "localhost"); - params.put("port", "5445"); - testProperties.put("ConnectionFactory", TestJMS2JMS.connectionFactory); - testProperties.put("jms/topic/apexIn", new ActiveMQTopic("jms/topic/apexIn")); - testProperties.put("jms/topic/apexOut", new ActiveMQTopic("jms/topic/apexOut")); + params.put("host", HOST); + params.put("port", PORT); + testProperties.put("ConnectionFactory", connectionFactory); + testProperties.put(JMS_TOPIC_APEX_IN, new ActiveMQTopic(JMS_TOPIC_APEX_IN)); + testProperties.put(JMS_TOPIC_APEX_OUT, new ActiveMQTopic(JMS_TOPIC_APEX_OUT)); } catch (final Exception e) { e.printStackTrace(); throw new ApexRuntimeException("Context initiation failed", e); diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java index ca23292a9..f021b2674 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java @@ -5,22 +5,22 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.policy.apex.apps.uservice.test.adapt.jms; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.nio.file.Paths; @@ -45,13 +45,17 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; public class TestJMS2JMS { + public static final String PORT = "5445"; + public static final String HOST = "localhost"; + public static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn"; + public static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut"; + + private static final int SLEEP_TIME = 1500; private static final String GROUP_ROLE = "guests"; private static final String PACKAGE_NAME = "org.onap.policy.apex.apps.uservice.test.adapt.jms"; private static final String USERNAME = "guest"; private static final String PASSWORD = "IAmAGuest"; - private static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn"; - private static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut"; - private static final String URL = "tcp://localhost:5445"; + private static final String URL = "tcp://" + HOST + ":" + PORT; private static final String DATA_PARENT_DIR = Paths.get("target", "activemq-data").toString(); @@ -70,9 +74,7 @@ public class TestJMS2JMS { public static void setupEmbeddedJMSServer() throws Exception { final ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>(); final BrokerPlugin authenticationPlugin = getAuthenticationBrokerPlugin(); - if (authenticationPlugin != null) { - plugins.add(authenticationPlugin); - } + plugins.add(authenticationPlugin); broker = new BrokerService(); broker.setUseJmx(false); @@ -108,13 +110,13 @@ public class TestJMS2JMS { @Test public void testJMSObjectEvents() throws ApexException, JMSException { - final String[] args = {"src/test/resources/prodcons/JMS2JMSObjectEvent.json"}; + final String[] args = { "src/test/resources/prodcons/JMS2JMSObjectEvent.json" }; testJMSEvents(args, true); } @Test public void testJMSJsonEvents() throws ApexException, JMSException { - final String[] args = {"src/test/resources/prodcons/JMS2JMSJsonEvent.json"}; + final String[] args = { "src/test/resources/prodcons/JMS2JMSJsonEvent.json" }; testJMSEvents(args, false); } @@ -131,20 +133,22 @@ public class TestJMS2JMS { final long testStartTime = System.currentTimeMillis(); - while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && subscriber.getEventsReceivedCount() < EVENT_COUNT) { + while (isTimedOut(testStartTime) && subscriber.getEventsReceivedCount() < EVENT_COUNT) { ThreadUtilities.sleep(EVENT_INTERVAL); } - ThreadUtilities.sleep(1000); - - System.out.println("sent event count: " + producer.getEventsSentCount()); - System.out.println("received event count: " + subscriber.getEventsReceivedCount()); - assertTrue(subscriber.getEventsReceivedCount() == producer.getEventsSentCount()); - + ThreadUtilities.sleep(SLEEP_TIME); apexMain.shutdown(); subscriber.shutdown(); producer.shutdown(); - ThreadUtilities.sleep(1000); + ThreadUtilities.sleep(SLEEP_TIME); + + assertEquals(EVENT_COUNT, producer.getEventsSentCount()); + assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount()); + + } + + private boolean isTimedOut(final long testStartTime) { + return System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH; } } |