aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml54
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java295
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java289
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java368
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java27
5 files changed, 1033 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml
new file mode 100644
index 000000000..61abf52b4
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml
@@ -0,0 +1,54 @@
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (C) 2018 Ericsson. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ SPDX-License-Identifier: Apache-2.0
+ ============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onap.policy.apex-pdp.plugins.plugins-event.plugins-event-carrier</groupId>
+ <artifactId>plugins-event-carrier</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>plugins-event-carrier-jms</artifactId>
+ <name>${project.artifactId}</name>
+ <description>[${project.parent.artifactId}] Plugin for handling events being transported over JMS</description>
+
+ <properties>
+ <apex-plugins-event-carrier-jms-dir>${project.basedir}/src</apex-plugins-event-carrier-jms-dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-remote-naming</artifactId>
+ <version>2.0.4.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.xnio</groupId>
+ <artifactId>xnio-nio</artifactId>
+ <version>3.2.0.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms-client</artifactId>
+ <version>2.3.25.Final</version>
+ </dependency>
+ </dependencies>
+</project> \ No newline at end of file
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
new file mode 100644
index 000000000..745a1e98c
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java
@@ -0,0 +1,295 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.jms;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements an Apex event consumer that receives events using JMS.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runnable {
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSConsumer.class);
+
+ // The Apex and JMS parameters read from the parameter service
+ private JMSCarrierTechnologyParameters jmsConsumerProperties;
+
+ // The event receiver that will receive events from this consumer
+ private ApexEventReceiver eventReceiver;
+
+ // The consumer thread and stopping flag
+ private Thread consumerThread;
+ private boolean stopOrderedFlag = false;
+
+ // The connection to the JMS server
+ private Connection connection;
+
+ // The topic on which we receive events from JMS
+ private Topic jmsIncomingTopic;
+
+ // The name for this consumer
+ private String name = null;
+
+ // The peer references for this event handler
+ private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
+
+ @Override
+ public void init(final String consumerName, final EventHandlerParameters consumerParameters,
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ this.eventReceiver = incomingEventReceiver;
+
+ this.name = consumerName;
+
+ // Check and get the JMS Properties
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) {
+ final String errorMessage = "specified consumer properties of type \""
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
+ + "\" are not applicable to a JMS consumer";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ jmsConsumerProperties = (JMSCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+
+ // Look up the JMS connection factory
+ InitialContext jmsContext = null;
+ ConnectionFactory connectionFactory = null;
+ try {
+ jmsContext = new InitialContext(jmsConsumerProperties.getJMSConsumerProperties());
+ connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory());
+
+ // Check if we actually got a connection factory
+ if (connectionFactory == null) {
+ throw new NullPointerException(
+ "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null");
+ }
+ } catch (final Exception e) {
+ final String errorMessage = "lookup of JMS connection factory \""
+ + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \""
+ + jmsConsumerProperties.getJMSConsumerProperties() + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Lookup the topic on which we will receive events
+ try {
+ jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic());
+
+ // Check if we actually got a topic
+ if (jmsIncomingTopic == null) {
+ throw new NullPointerException(
+ "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null");
+ }
+ } catch (final Exception e) {
+ final String errorMessage = "lookup of JMS topic \"" + jmsConsumerProperties.getConsumerTopic()
+ + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJMSConsumerProperties()
+ + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Create and start a connection to the JMS server
+ try {
+ connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(),
+ jmsConsumerProperties.getSecurityCredentials());
+ connection.start();
+ } catch (final Exception e) {
+ final String errorMessage = "connection to the JMS server failed for JMS properties \""
+ + jmsConsumerProperties.getJMSConsumerProperties() + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
+ */
+ @Override
+ public void start() {
+ // Configure and start the event reception thread
+ final String threadName = this.getClass().getName() + ":" + this.name;
+ consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service.
+ * parameters. eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service.
+ * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // JMS session and message consumer for receiving messages
+ Session jmsSession = null;
+ MessageConsumer messageConsumer = null;
+
+ // Create a session to the JMS server
+ try {
+ jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ } catch (final Exception e) {
+ final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ // Create a message consumer for reception of messages and set this class as a message listener
+ try {
+ messageConsumer = jmsSession.createConsumer(jmsIncomingTopic);
+ messageConsumer.setMessageListener(this);
+ } catch (final Exception e) {
+ final String errorMessage = "failed to create a JMS message consumer for receiving messages";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ // Everything is now set up
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: "
+ + jmsConsumerProperties.getConsumerTopic());
+ }
+
+ // The endless loop that receives events over JMS
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
+ }
+
+ // Close the message consumer
+ try {
+ messageConsumer.close();
+ } catch (final Exception e) {
+ final String errorMessage = "failed to close the JMS message consumer for receiving messages";
+ LOGGER.warn(errorMessage, e);
+ }
+
+ // Close the session
+ try {
+ jmsSession.close();
+ } catch (final Exception e) {
+ final String errorMessage = "failed to close the JMS session for receiving messages";
+ LOGGER.warn(errorMessage, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ @Override
+ public void onMessage(final Message jmsMessage) {
+ try {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
+ this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(),
+ jmsMessage.getJMSType());
+ }
+
+ eventReceiver.receiveEvent(jmsMessage);
+ } catch (final Exception e) {
+ final String errorMessage = "failed to receive message from JMS";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ stopOrderedFlag = true;
+
+ while (consumerThread.isAlive()) {
+ ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime());
+ }
+
+ // Close the connection to the JMS server
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (final Exception e) {
+ final String errorMessage = "close of connection to the JMS server failed";
+ LOGGER.warn(errorMessage, e);
+ }
+ }
+
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java
new file mode 100644
index 000000000..017f07f6f
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java
@@ -0,0 +1,289 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.jms;
+
+import java.io.Serializable;
+import java.util.EnumMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProducer;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concrete implementation of an Apex event producer that sends events using JMS.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexJMSProducer implements ApexEventProducer {
+
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class);
+
+ // The JMS parameters read from the parameter service
+ private JMSCarrierTechnologyParameters jmsProducerProperties;
+
+ // The connection to the JMS server
+ private Connection connection;
+
+ // The topic on which we send events to JMS
+ private Topic jmsOutgoingTopic;
+
+ // The JMS session on which we will send events
+ private Session jmsSession;
+
+ // The producer on which we will send events
+ private MessageProducer messageProducer;
+
+ // The name for this producer
+ private String name = null;
+
+ // The peer references for this event handler
+ private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
+ * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
+ */
+ @Override
+ public void init(final String producerName, final EventHandlerParameters producerParameters)
+ throws ApexEventException {
+ this.name = producerName;
+
+ // Check and get the JMS Properties
+ if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) {
+ LOGGER.warn("specified producer properties are not applicable to a JMS producer (" + this.name + ")");
+ throw new ApexEventException(
+ "specified producer properties are not applicable to a JMS producer (" + this.name + ")");
+ }
+ jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+
+ // Look up the JMS connection factory
+ InitialContext jmsContext = null;
+ ConnectionFactory connectionFactory = null;
+ try {
+ jmsContext = new InitialContext(jmsProducerProperties.getJMSProducerProperties());
+ connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory());
+
+ // Check if we actually got a connection factory
+ if (connectionFactory == null) {
+ throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory()
+ + "\" returned null for producer (" + this.name + ")");
+ }
+ } catch (final Exception e) {
+ final String errorMessage = "lookup of JMS connection factory \""
+ + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \""
+ + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Lookup the topic on which we will send events
+ try {
+ jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic());
+
+ // Check if we actually got a topic
+ if (jmsOutgoingTopic == null) {
+ throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic()
+ + "\" returned null for producer (" + this.name + ")");
+ }
+ } catch (final Exception e) {
+ final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic()
+ + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJMSProducerProperties()
+ + "\" for producer (" + this.name + ")";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Create and start a connection to the JMS server
+ try {
+ connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(),
+ jmsProducerProperties.getSecurityCredentials());
+ connection.start();
+ } catch (final Exception e) {
+ final String errorMessage = "connection to JMS server failed for JMS properties \""
+ + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Create a JMS session for sending events
+ try {
+ jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ } catch (final Exception e) {
+ final String errorMessage = "creation of session to JMS server failed for JMS properties \""
+ + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Create a JMS message producer for sending events
+ try {
+ messageProducer = jmsSession.createProducer(jmsOutgoingTopic);
+ } catch (final Exception e) {
+ final String errorMessage =
+ "creation of producer for sending events to JMS server failed for JMS properties \""
+ + jmsProducerProperties.getJMSConsumerProperties() + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
+ * parameters. eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
+ * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String,
+ * java.lang.Object)
+ */
+ @Override
+ public void sendEvent(final long executionId, final String eventname, final Object eventObject) {
+ // Check if this is a synchronized event, if so we have received a reply
+ final SynchronousEventCache synchronousEventCache =
+ (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
+ if (synchronousEventCache != null) {
+ synchronousEventCache.removeCachedEventToApexIfExists(executionId);
+ }
+
+ // Check if the object to be sent is serializable
+ if (!Serializable.class.isAssignableFrom(eventObject.getClass())) {
+ final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
+ + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName()
+ + "\" is not serializable";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // The JMS message to send is constructed using the JMS session
+ Message jmsMessage = null;
+
+ // Check the type of JMS message to send
+ if (jmsProducerProperties.isObjectMessageSending()) {
+ // We should send a JMS Object Message
+ try {
+ jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject);
+ } catch (final Exception e) {
+ final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
+ + this.name + ", could not create JMS Object Message for object \"" + eventObject;
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ } else {
+ // We should send a JMS Text Message
+ try {
+ jmsMessage = jmsSession.createTextMessage(eventObject.toString());
+ } catch (final Exception e) {
+ final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
+ + this.name + ", could not create JMS Text Message for object \"" + eventObject;
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ }
+
+ try {
+ messageProducer.send(jmsMessage);
+ } catch (final Exception e) {
+ final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer "
+ + this.name + ", send failed for object \"" + eventObject;
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ // Close the message producer
+ try {
+ messageProducer.close();
+ } catch (final Exception e) {
+ final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages";
+ LOGGER.warn(errorMessage, e);
+ }
+
+ // Close the session
+ try {
+ jmsSession.close();
+ } catch (final Exception e) {
+ final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages";
+ LOGGER.warn(errorMessage, e);
+ }
+
+ // Close the connection to the JMS server
+ try {
+ connection.close();
+ } catch (final Exception e) {
+ final String errorMessage = "close of connection to the JMS server for " + this.name + " failed";
+ LOGGER.warn(errorMessage, e);
+ }
+ }
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java
new file mode 100644
index 000000000..48f5fe267
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java
@@ -0,0 +1,368 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.jms;
+
+import java.util.Properties;
+
+import javax.naming.Context;
+
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+
+/**
+ * Apex parameters for JMS as an event carrier technology.
+ * <p>
+ * The parameters for this plugin are:
+ * <ol>
+ * <li>initialContextFactory: JMS uses a naming {@link Context} object to look up the locations of JMS servers and JMS
+ * topics. An Initial Context Factory is used to when creating a {@link Context} object that can be used for JMS
+ * lookups. The value of this parameter is passed to the {@link Context} with the label
+ * {@link Context#INITIAL_CONTEXT_FACTORY}. Its value must be the full canonical path to a class that implements the
+ * {@code javax.naming.spi.InitialContextFactory} interface. The parameter defaults to the string value
+ * {@code org.jboss.naming.remote.client.InitialContextFactory}.
+ * <li>providerURL: The location of the server to use for naming context lookups. The value of this parameter is passed
+ * to the {@link Context} with the label {@link Context#PROVIDER_URL}. Its value must be a URL that identifies the JMS
+ * naming server. The parameter defaults to the string value {@code remote://localhost:4447}.
+ * <li>securityPrincipal: The user name to use for JMS access. The value of this parameter is passed to the
+ * {@link Context} with the label {@link Context#SECURITY_PRINCIPAL}. Its value must be the user name of a user defined
+ * on the JMS server. The parameter defaults to the string value {@code userid}.
+ * <li>securityCredentials:The password to use for JMS access. The value of this parameter is passed to the
+ * {@link Context} with the label {@link Context#SECURITY_CREDENTIALS}. Its value must be the password of a suer defined
+ * on the JMS server. The parameter defaults to the string value {@code password}.
+ * <li>connectionFactory: JMS uses a {@link javax.jms.ConnectionFactory} instance to create connections towards a JMS
+ * server. The connection factory to use is held in the JMS {@link Context} object. This parameter specifies the label
+ * to use to look up the {@link javax.jms.ConnectionFactory} instance from the JMS {@link Context}.
+ * <li>producerTopic: JMS uses a {@link javax.jms.Topic} instance to for sending and receiving messages. The topic to
+ * use for sending events to JMS from an Apex producer is held in the JMS {@link Context} object. This parameter
+ * specifies the label to use to look up the {@link javax.jms.Topic} instance in the JMS {@link Context} for the JMS
+ * server. The topic must, of course, also be defined on the JMS server. The parameter defaults to the string value
+ * {@code apex-out}.
+ * <li>consumerTopic: The topic to use for receiving events from JMS in an Apex consumer is held in the JMS
+ * {@link Context} object. This parameter specifies the label to use to look up the {@link javax.jms.Topic} instance in
+ * the JMS {@link Context} for the JMS server. The topic must, of course, also be defined on the JMS server. The
+ * parameter defaults to the string value {@code apex-in}.
+ * <li>consumerWaitTime: The amount of milliseconds a JMS consumer should wait between checks of its thread execution
+ * status. The parameter defaults to the long value {@code 100}.
+ * <li>objectMessageSending: A flag that indicates whether Apex producers should send JMS messages as
+ * {@link javax.jms.ObjectMessage} instances for java objects (value {@code true}) or as {@link javax.jms.TextMessage}
+ * instances for strings (value {@code false}) . The parameter defaults to the boolean value {@code true}.
+ * </ol>
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class JMSCarrierTechnologyParameters extends CarrierTechnologyParameters {
+ /** The label of this carrier technology. */
+ public static final String JMS_CARRIER_TECHNOLOGY_LABEL = "JMS";
+
+ /** The producer plugin class for the JMS carrier technology. */
+ public static final String JMS_EVENT_PRODUCER_PLUGIN_CLASS = ApexJMSProducer.class.getCanonicalName();
+
+ /** The consumer plugin class for the JMS carrier technology. */
+ public static final String JMS_EVENT_CONSUMER_PLUGIN_CLASS = ApexJMSConsumer.class.getCanonicalName();
+
+ // @formatter:off
+
+ // Default parameter values
+ private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory";
+ private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.jboss.naming.remote.client.InitialContextFactory";
+ private static final String DEFAULT_PROVIDER_URL = "remote://localhost:4447";
+ private static final String DEFAULT_SECURITY_PRINCIPAL = "userid";
+ private static final String DEFAULT_SECURITY_CREDENTIALS = "password";
+ private static final String DEFAULT_CONSUMER_TOPIC = "apex-in";
+ private static final String DEFAULT_PRODUCER_TOPIC = "apex-out";
+ private static final int DEFAULT_CONSUMER_WAIT_TIME = 100;
+ private static final boolean DEFAULT_TO_OBJECT_MESSAGE_SENDING = true;
+
+ // Parameter property map tokens
+ private static final String PROPERTY_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY;
+ private static final String PROPERTY_PROVIDER_URL = Context.PROVIDER_URL;
+ private static final String PROPERTY_SECURITY_PRINCIPAL = Context.SECURITY_PRINCIPAL;
+ private static final String PROPERTY_SECURITY_CREDENTIALS = Context.SECURITY_CREDENTIALS;
+
+ // JMS carrier parameters
+ private String connectionFactory = DEFAULT_CONNECTION_FACTORY;
+ private String initialContextFactory = DEFAULT_INITIAL_CONTEXT_FACTORY;
+ private String providerURL = DEFAULT_PROVIDER_URL;
+ private String securityPrincipal = DEFAULT_SECURITY_PRINCIPAL;
+ private String securityCredentials = DEFAULT_SECURITY_CREDENTIALS;
+ private String producerTopic = DEFAULT_PRODUCER_TOPIC;
+ private String consumerTopic = DEFAULT_CONSUMER_TOPIC;
+ private int consumerWaitTime = DEFAULT_CONSUMER_WAIT_TIME;
+ private boolean objectMessageSending = DEFAULT_TO_OBJECT_MESSAGE_SENDING;
+ // @formatter:on
+
+ /**
+ * Constructor to create a jms carrier technology parameters instance and register the instance with the parameter
+ * service.
+ */
+ public JMSCarrierTechnologyParameters() {
+ super(JMSCarrierTechnologyParameters.class.getCanonicalName());
+
+ // Set the carrier technology properties for the JMS carrier technology
+ this.setLabel(JMS_CARRIER_TECHNOLOGY_LABEL);
+ this.setEventProducerPluginClass(JMS_EVENT_PRODUCER_PLUGIN_CLASS);
+ this.setEventConsumerPluginClass(JMS_EVENT_CONSUMER_PLUGIN_CLASS);
+ }
+
+ /**
+ * Gets the JMS producer properties.
+ *
+ * @return the JMS producer properties
+ */
+ public Properties getJMSProducerProperties() {
+ final Properties jmsProperties = new Properties();
+
+ jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory);
+ jmsProperties.put(PROPERTY_PROVIDER_URL, providerURL);
+ jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal);
+ jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials);
+
+ return jmsProperties;
+ }
+
+ /**
+ * Gets the jms consumer properties.
+ *
+ * @return the jms consumer properties
+ */
+ public Properties getJMSConsumerProperties() {
+ final Properties jmsProperties = new Properties();
+
+ jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory);
+ jmsProperties.put(PROPERTY_PROVIDER_URL, providerURL);
+ jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal);
+ jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials);
+
+ return jmsProperties;
+ }
+
+ /**
+ * Gets the connection factory.
+ *
+ * @return the connection factory
+ */
+ public String getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ /**
+ * Gets the initial context factory.
+ *
+ * @return the initial context factory
+ */
+ public String getInitialContextFactory() {
+ return initialContextFactory;
+ }
+
+ /**
+ * Gets the provider URL.
+ *
+ * @return the provider URL
+ */
+ public String getProviderURL() {
+ return providerURL;
+ }
+
+ /**
+ * Gets the security principal.
+ *
+ * @return the security principal
+ */
+ public String getSecurityPrincipal() {
+ return securityPrincipal;
+ }
+
+ /**
+ * Gets the security credentials.
+ *
+ * @return the security credentials
+ */
+ public String getSecurityCredentials() {
+ return securityCredentials;
+ }
+
+ /**
+ * Gets the producer topic.
+ *
+ * @return the producer topic
+ */
+ public String getProducerTopic() {
+ return producerTopic;
+ }
+
+ /**
+ * Gets the consumer topic.
+ *
+ * @return the consumer topic
+ */
+ public String getConsumerTopic() {
+ return consumerTopic;
+ }
+
+ /**
+ * Gets the consumer wait time.
+ *
+ * @return the consumer wait time
+ */
+ public long getConsumerWaitTime() {
+ return consumerWaitTime;
+ }
+
+ /**
+ * Sets the connection factory.
+ *
+ * @param connectionFactory the connection factory
+ */
+ public void setConnectionFactory(final String connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * Sets the initial context factory.
+ *
+ * @param initialContextFactory the initial context factory
+ */
+ public void setInitialContextFactory(final String initialContextFactory) {
+ this.initialContextFactory = initialContextFactory;
+ }
+
+ /**
+ * Sets the provider URL.
+ *
+ * @param providerURL the provider URL
+ */
+ public void setProviderURL(final String providerURL) {
+ this.providerURL = providerURL;
+ }
+
+ /**
+ * Sets the security principal.
+ *
+ * @param securityPrincipal the security principal
+ */
+ public void setSecurityPrincipal(final String securityPrincipal) {
+ this.securityPrincipal = securityPrincipal;
+ }
+
+ /**
+ * Sets the security credentials.
+ *
+ * @param securityCredentials the security credentials
+ */
+ public void setSecurityCredentials(final String securityCredentials) {
+ this.securityCredentials = securityCredentials;
+ }
+
+ /**
+ * Sets the producer topic.
+ *
+ * @param producerTopic the producer topic
+ */
+ public void setProducerTopic(final String producerTopic) {
+ this.producerTopic = producerTopic;
+ }
+
+ /**
+ * Sets the consumer topic.
+ *
+ * @param consumerTopic the consumer topic
+ */
+ public void setConsumerTopic(final String consumerTopic) {
+ this.consumerTopic = consumerTopic;
+ }
+
+ /**
+ * Sets the consumer wait time.
+ *
+ * @param consumerWaitTime the consumer wait time
+ */
+ public void setConsumerWaitTime(final int consumerWaitTime) {
+ this.consumerWaitTime = consumerWaitTime;
+ }
+
+ /**
+ * Checks if is object message sending.
+ *
+ * @return true, if checks if is object message sending
+ */
+ public boolean isObjectMessageSending() {
+ return objectMessageSending;
+ }
+
+ /**
+ * Sets the object message sending.
+ *
+ * @param objectMessageSending the object message sending
+ */
+ public void setObjectMessageSending(final boolean objectMessageSending) {
+ this.objectMessageSending = objectMessageSending;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
+ */
+ @Override
+ public String validate() {
+ final StringBuilder errorMessageBuilder = new StringBuilder();
+
+ errorMessageBuilder.append(super.validate());
+
+ if (initialContextFactory == null || initialContextFactory.trim().length() == 0) {
+ errorMessageBuilder
+ .append(" initialContextFactory not specified, must be specified as a string that is a class"
+ + " that implements the interface org.jboss.naming.remote.client.InitialContextFactory\n");
+ }
+
+ if (providerURL == null || providerURL.trim().length() == 0) {
+ errorMessageBuilder.append(
+ " providerURL not specified, must be specified as a URL string that specifies the location of "
+ + "configuration information for the service provider to use such as remote://localhost:4447\n");
+ }
+
+ if (securityPrincipal == null || securityPrincipal.trim().length() == 0) {
+ errorMessageBuilder.append(
+ " securityPrincipal not specified, must be specified the identity of the principal for authenticating the caller to the service\n");
+ }
+
+ if (securityCredentials == null || securityCredentials.trim().length() == 0) {
+ errorMessageBuilder.append(" securityCredentials not specified, must be specified as "
+ + "the credentials of the principal for authenticating the caller to the service\n");
+ }
+
+ if (producerTopic == null || producerTopic.trim().length() == 0) {
+ errorMessageBuilder.append(
+ " producerTopic not specified, must be a string that identifies the JMS topic on which Apex will send events\n");
+ }
+
+ if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+ errorMessageBuilder.append(
+ " consumerTopic not specified, must be a string that identifies the JMS topic on which Apex will recieve events\n");
+ }
+
+ if (consumerWaitTime < 0) {
+ errorMessageBuilder.append(" consumerWaitTime [" + consumerWaitTime
+ + "] invalid, must be specified as consumerWaitTime >= 0\n");
+ }
+
+ return errorMessageBuilder.toString();
+ }
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java
new file mode 100644
index 000000000..115bc5009
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the APEX event carrier technology plugin for JMS.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.jms;