summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java17
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java40
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java30
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java22
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java46
5 files changed, 80 insertions, 75 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
index a96517510..878882d6b 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
@@ -196,7 +196,7 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn
@Override
public void run() {
// JMS session and message consumer for receiving messages
- try (Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
// Create a message consumer for reception of messages and set this class as a message listener
createMessageConsumer(jmsSession);
} catch (final Exception e) {
@@ -209,20 +209,21 @@ public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runn
LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: "
+ jmsConsumerProperties.getConsumerTopic());
}
- // The endless loop that receives events over JMS
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
- }
}
/**
* The helper function to create a message consumer from a given JMS session
- *
+ *
* @param jmsSession a JMS session
*/
- private void createMessageConsumer(Session jmsSession) {
- try (MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
+ private void createMessageConsumer(final Session jmsSession) {
+ try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) {
messageConsumer.setMessageListener(this);
+
+ // The endless loop that receives events over JMS
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
+ }
} catch (final Exception e) {
final String errorMessage = "failed to create a JMS message consumer for receiving messages";
LOGGER.warn(errorMessage, e);
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java
index e5ddbfee5..8b9f2a112 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventProducer.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -32,11 +32,15 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.onap.policy.apex.apps.uservice.test.adapt.events.EventGenerator;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class JMSEventProducer implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventProducer.class);
+
private final String topic;
private final int eventCount;
private final boolean sendObjects;
@@ -48,8 +52,9 @@ public class JMSEventProducer implements Runnable {
private boolean stopFlag = false;
private final Connection connection;
- public JMSEventProducer(String topic, ConnectionFactory connectionFactory, String username, String password,
- final int eventCount, final boolean sendObjects, final long eventInterval) throws JMSException {
+ public JMSEventProducer(final String topic, final ConnectionFactory connectionFactory, final String username,
+ final String password, final int eventCount, final boolean sendObjects, final long eventInterval)
+ throws JMSException {
this.topic = topic;
this.eventCount = eventCount;
this.sendObjects = sendObjects;
@@ -63,10 +68,9 @@ public class JMSEventProducer implements Runnable {
@Override
public void run() {
- try {
- final Topic jmsTopic = new ActiveMQTopic(topic);
- final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);
+ final Topic jmsTopic = new ActiveMQTopic(topic);
+ try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer jmsProducer = jmsSession.createProducer(jmsTopic);) {
while (producerThread.isAlive() && !stopFlag) {
ThreadUtilities.sleep(50);
@@ -77,8 +81,6 @@ public class JMSEventProducer implements Runnable {
}
}
- jmsProducer.close();
- jmsSession.close();
} catch (final Exception e) {
throw new ApexEventRuntimeException("JMS event consumption failed", e);
}
@@ -89,12 +91,11 @@ public class JMSEventProducer implements Runnable {
}
private void sendEventsToTopic(final Session jmsSession, final MessageProducer jmsProducer) throws JMSException {
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": sending events to JMS server, event count "
- + eventCount);
+
+ LOGGER.info("{} : sending events to JMS server, event count {}", this.getClass().getCanonicalName(),
+ eventCount);
for (int i = 0; i < eventCount; i++) {
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
- + " milliseconds before sending next event");
ThreadUtilities.sleep(eventInterval);
Message jmsMessage = null;
@@ -105,9 +106,8 @@ public class JMSEventProducer implements Runnable {
}
jmsProducer.send(jmsMessage);
eventsSentCount++;
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": sent event " + jmsMessage.toString());
}
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": completed");
+ LOGGER.info("{} : completed, number of events sent", this.getClass().getCanonicalName(), eventsSentCount);
}
public long getEventsSentCount() {
@@ -115,15 +115,13 @@ public class JMSEventProducer implements Runnable {
}
public void shutdown() {
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopping");
-
+ LOGGER.info("{} : stopping", this.getClass().getCanonicalName());
stopFlag = true;
while (producerThread.isAlive()) {
ThreadUtilities.sleep(10);
}
-
- System.out.println(JMSEventProducer.class.getCanonicalName() + ": stopped");
+ LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
}
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java
index 46455f5df..916216a5a 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/JMSEventSubscriber.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -34,11 +34,15 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class JMSEventSubscriber implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JMSEventSubscriber.class);
+
private final String topic;
private long eventsReceivedCount = 0;
@@ -58,13 +62,9 @@ public class JMSEventSubscriber implements Runnable {
@Override
public void run() {
- try {
- final Topic jmsTopic = new ActiveMQTopic(topic);
- final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic);
-
- System.out.println(JMSEventSubscriber.class.getCanonicalName()
- + ": receiving events from Kafka server on topic " + topic);
+ final Topic jmsTopic = new ActiveMQTopic(topic);
+ try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer jmsConsumer = jmsSession.createConsumer(jmsTopic);) {
while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
try {
@@ -75,11 +75,9 @@ public class JMSEventSubscriber implements Runnable {
if (message instanceof ObjectMessage) {
final TestPing testPing = (TestPing) ((ObjectMessage) message).getObject();
- System.out.println("Received message: " + testPing.toString());
testPing.verify();
} else if (message instanceof TextMessage) {
- final String textMessage = ((TextMessage) message).getText();
- System.out.println("Received message: " + textMessage);
+ ((TextMessage) message).getText();
} else {
throw new ApexEventException("unknowm message \"" + message + "\" of type \""
+ message.getClass().getCanonicalName() + "\" received");
@@ -90,13 +88,11 @@ public class JMSEventSubscriber implements Runnable {
}
}
- jmsConsumer.close();
- jmsSession.close();
} catch (final Exception e) {
throw new ApexEventRuntimeException("JMS event consumption failed", e);
}
- System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": event reception completed");
+ LOGGER.info("{} : event reception completed", this.getClass().getCanonicalName());
}
public long getEventsReceivedCount() {
@@ -111,7 +107,7 @@ public class JMSEventSubscriber implements Runnable {
}
connection.close();
- System.out.println(JMSEventSubscriber.class.getCanonicalName() + ": stopped");
+ LOGGER.info("{} : stopped", this.getClass().getCanonicalName());
}
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java
index 36d689bda..de3c8d37b 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestContext.java
@@ -5,21 +5,27 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.policy.apex.apps.uservice.test.adapt.jms;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.HOST;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_IN;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.JMS_TOPIC_APEX_OUT;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.PORT;
+import static org.onap.policy.apex.apps.uservice.test.adapt.jms.TestJMS2JMS.connectionFactory;
+
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
@@ -48,11 +54,11 @@ public class TestContext implements Context {
testProperties = new Properties();
final Map<String, Object> params = new HashMap<String, Object>();
- params.put("host", "localhost");
- params.put("port", "5445");
- testProperties.put("ConnectionFactory", TestJMS2JMS.connectionFactory);
- testProperties.put("jms/topic/apexIn", new ActiveMQTopic("jms/topic/apexIn"));
- testProperties.put("jms/topic/apexOut", new ActiveMQTopic("jms/topic/apexOut"));
+ params.put("host", HOST);
+ params.put("port", PORT);
+ testProperties.put("ConnectionFactory", connectionFactory);
+ testProperties.put(JMS_TOPIC_APEX_IN, new ActiveMQTopic(JMS_TOPIC_APEX_IN));
+ testProperties.put(JMS_TOPIC_APEX_OUT, new ActiveMQTopic(JMS_TOPIC_APEX_OUT));
} catch (final Exception e) {
e.printStackTrace();
throw new ApexRuntimeException("Context initiation failed", e);
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java
index ca23292a9..f021b2674 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/jms/TestJMS2JMS.java
@@ -5,22 +5,22 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.policy.apex.apps.uservice.test.adapt.jms;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.file.Paths;
@@ -45,13 +45,17 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
public class TestJMS2JMS {
+ public static final String PORT = "5445";
+ public static final String HOST = "localhost";
+ public static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn";
+ public static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut";
+
+ private static final int SLEEP_TIME = 1500;
private static final String GROUP_ROLE = "guests";
private static final String PACKAGE_NAME = "org.onap.policy.apex.apps.uservice.test.adapt.jms";
private static final String USERNAME = "guest";
private static final String PASSWORD = "IAmAGuest";
- private static final String JMS_TOPIC_APEX_IN = "jms/topic/apexIn";
- private static final String JMS_TOPIC_APEX_OUT = "jms/topic/apexOut";
- private static final String URL = "tcp://localhost:5445";
+ private static final String URL = "tcp://" + HOST + ":" + PORT;
private static final String DATA_PARENT_DIR = Paths.get("target", "activemq-data").toString();
@@ -70,9 +74,7 @@ public class TestJMS2JMS {
public static void setupEmbeddedJMSServer() throws Exception {
final ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
final BrokerPlugin authenticationPlugin = getAuthenticationBrokerPlugin();
- if (authenticationPlugin != null) {
- plugins.add(authenticationPlugin);
- }
+ plugins.add(authenticationPlugin);
broker = new BrokerService();
broker.setUseJmx(false);
@@ -108,13 +110,13 @@ public class TestJMS2JMS {
@Test
public void testJMSObjectEvents() throws ApexException, JMSException {
- final String[] args = {"src/test/resources/prodcons/JMS2JMSObjectEvent.json"};
+ final String[] args = { "src/test/resources/prodcons/JMS2JMSObjectEvent.json" };
testJMSEvents(args, true);
}
@Test
public void testJMSJsonEvents() throws ApexException, JMSException {
- final String[] args = {"src/test/resources/prodcons/JMS2JMSJsonEvent.json"};
+ final String[] args = { "src/test/resources/prodcons/JMS2JMSJsonEvent.json" };
testJMSEvents(args, false);
}
@@ -131,20 +133,22 @@ public class TestJMS2JMS {
final long testStartTime = System.currentTimeMillis();
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
+ while (isTimedOut(testStartTime) && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
ThreadUtilities.sleep(EVENT_INTERVAL);
}
- ThreadUtilities.sleep(1000);
-
- System.out.println("sent event count: " + producer.getEventsSentCount());
- System.out.println("received event count: " + subscriber.getEventsReceivedCount());
- assertTrue(subscriber.getEventsReceivedCount() == producer.getEventsSentCount());
-
+ ThreadUtilities.sleep(SLEEP_TIME);
apexMain.shutdown();
subscriber.shutdown();
producer.shutdown();
- ThreadUtilities.sleep(1000);
+ ThreadUtilities.sleep(SLEEP_TIME);
+
+ assertEquals(EVENT_COUNT, producer.getEventsSentCount());
+ assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
+
+ }
+
+ private boolean isTimedOut(final long testStartTime) {
+ return System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH;
}
}