diff options
author | Liam Fallon <liam.fallon@ericsson.com> | 2018-06-08 17:05:17 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-06-08 17:05:17 +0000 |
commit | d0746b062ca6b85eae756db6eb6c4dead037a35a (patch) | |
tree | 896107ee6e4787c42f34225fed944653f18bb2be /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms | |
parent | 6a9b54b275feff5369419a86997e94d0a95fc48e (diff) | |
parent | 697d02bf4e4188e3040cd987dee97d15f397a35f (diff) |
Merge "Adding plugins-event module to apex-pdp"
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms')
5 files changed, 1033 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml new file mode 100644 index 000000000..61abf52b4 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/pom.xml @@ -0,0 +1,54 @@ +<!-- + ============LICENSE_START======================================================= + Copyright (C) 2018 Ericsson. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + SPDX-License-Identifier: Apache-2.0 + ============LICENSE_END========================================================= +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.policy.apex-pdp.plugins.plugins-event.plugins-event-carrier</groupId> + <artifactId>plugins-event-carrier</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>plugins-event-carrier-jms</artifactId> + <name>${project.artifactId}</name> + <description>[${project.parent.artifactId}] Plugin for handling events being transported over JMS</description> + + <properties> + <apex-plugins-event-carrier-jms-dir>${project.basedir}/src</apex-plugins-event-carrier-jms-dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss</groupId> + <artifactId>jboss-remote-naming</artifactId> + <version>2.0.4.Final</version> + </dependency> + <dependency> + <groupId>org.jboss.xnio</groupId> + <artifactId>xnio-nio</artifactId> + <version>3.2.0.Final</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-client</artifactId> + <version>2.3.25.Final</version> + </dependency> + </dependencies> +</project>
\ No newline at end of file diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java new file mode 100644 index 000000000..745a1e98c --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java @@ -0,0 +1,295 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.util.EnumMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements an Apex event consumer that receives events using JMS. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runnable { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSConsumer.class); + + // The Apex and JMS parameters read from the parameter service + private JMSCarrierTechnologyParameters jmsConsumerProperties; + + // The event receiver that will receive events from this consumer + private ApexEventReceiver eventReceiver; + + // The consumer thread and stopping flag + private Thread consumerThread; + private boolean stopOrderedFlag = false; + + // The connection to the JMS server + private Connection connection; + + // The topic on which we receive events from JMS + private Topic jmsIncomingTopic; + + // The name for this consumer + private String name = null; + + // The peer references for this event handler + private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + + this.name = consumerName; + + // Check and get the JMS Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties of type \"" + + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + + "\" are not applicable to a JMS consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + jmsConsumerProperties = (JMSCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // Look up the JMS connection factory + InitialContext jmsContext = null; + ConnectionFactory connectionFactory = null; + try { + jmsContext = new InitialContext(jmsConsumerProperties.getJMSConsumerProperties()); + connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory()); + + // Check if we actually got a connection factory + if (connectionFactory == null) { + throw new NullPointerException( + "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS connection factory \"" + + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \"" + + jmsConsumerProperties.getJMSConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Lookup the topic on which we will receive events + try { + jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic()); + + // Check if we actually got a topic + if (jmsIncomingTopic == null) { + throw new NullPointerException( + "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS topic \"" + jmsConsumerProperties.getConsumerTopic() + + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJMSConsumerProperties() + + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create and start a connection to the JMS server + try { + connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(), + jmsConsumerProperties.getSecurityCredentials()); + connection.start(); + } catch (final Exception e) { + final String errorMessage = "connection to the JMS server failed for JMS properties \"" + + jmsConsumerProperties.getJMSConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // JMS session and message consumer for receiving messages + Session jmsSession = null; + MessageConsumer messageConsumer = null; + + // Create a session to the JMS server + try { + jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (final Exception e) { + final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Create a message consumer for reception of messages and set this class as a message listener + try { + messageConsumer = jmsSession.createConsumer(jmsIncomingTopic); + messageConsumer.setMessageListener(this); + } catch (final Exception e) { + final String errorMessage = "failed to create a JMS message consumer for receiving messages"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Everything is now set up + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: " + + jmsConsumerProperties.getConsumerTopic()); + } + + // The endless loop that receives events over JMS + while (consumerThread.isAlive() && !stopOrderedFlag) { + ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); + } + + // Close the message consumer + try { + messageConsumer.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close the JMS message consumer for receiving messages"; + LOGGER.warn(errorMessage, e); + } + + // Close the session + try { + jmsSession.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close the JMS session for receiving messages"; + LOGGER.warn(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see javax.jms.MessageListener#onMessage(javax.jms.Message) + */ + @Override + public void onMessage(final Message jmsMessage) { + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(), + jmsMessage.getJMSType()); + } + + eventReceiver.receiveEvent(jmsMessage); + } catch (final Exception e) { + final String errorMessage = "failed to receive message from JMS"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + stopOrderedFlag = true; + + while (consumerThread.isAlive()) { + ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); + } + + // Close the connection to the JMS server + try { + if (connection != null) { + connection.close(); + } + } catch (final Exception e) { + final String errorMessage = "close of connection to the JMS server failed"; + LOGGER.warn(errorMessage, e); + } + } + +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java new file mode 100644 index 000000000..017f07f6f --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java @@ -0,0 +1,289 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.io.Serializable; +import java.util.EnumMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events using JMS. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexJMSProducer implements ApexEventProducer { + + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class); + + // The JMS parameters read from the parameter service + private JMSCarrierTechnologyParameters jmsProducerProperties; + + // The connection to the JMS server + private Connection connection; + + // The topic on which we send events to JMS + private Topic jmsOutgoingTopic; + + // The JMS session on which we will send events + private Session jmsSession; + + // The producer on which we will send events + private MessageProducer messageProducer; + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String, + * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters) + */ + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the JMS Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) { + LOGGER.warn("specified producer properties are not applicable to a JMS producer (" + this.name + ")"); + throw new ApexEventException( + "specified producer properties are not applicable to a JMS producer (" + this.name + ")"); + } + jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + // Look up the JMS connection factory + InitialContext jmsContext = null; + ConnectionFactory connectionFactory = null; + try { + jmsContext = new InitialContext(jmsProducerProperties.getJMSProducerProperties()); + connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory()); + + // Check if we actually got a connection factory + if (connectionFactory == null) { + throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory() + + "\" returned null for producer (" + this.name + ")"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS connection factory \"" + + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \"" + + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Lookup the topic on which we will send events + try { + jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic()); + + // Check if we actually got a topic + if (jmsOutgoingTopic == null) { + throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic() + + "\" returned null for producer (" + this.name + ")"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic() + + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJMSProducerProperties() + + "\" for producer (" + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create and start a connection to the JMS server + try { + connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(), + jmsProducerProperties.getSecurityCredentials()); + connection.start(); + } catch (final Exception e) { + final String errorMessage = "connection to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create a JMS session for sending events + try { + jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (final Exception e) { + final String errorMessage = "creation of session to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJMSConsumerProperties() + "\" for producer (" + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create a JMS message producer for sending events + try { + messageProducer = jmsSession.createProducer(jmsOutgoingTopic); + } catch (final Exception e) { + final String errorMessage = + "creation of producer for sending events to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJMSConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String, + * java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventname, final Object eventObject) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Check if the object to be sent is serializable + if (!Serializable.class.isAssignableFrom(eventObject.getClass())) { + final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " + + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName() + + "\" is not serializable"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + // The JMS message to send is constructed using the JMS session + Message jmsMessage = null; + + // Check the type of JMS message to send + if (jmsProducerProperties.isObjectMessageSending()) { + // We should send a JMS Object Message + try { + jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject); + } catch (final Exception e) { + final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " + + this.name + ", could not create JMS Object Message for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } else { + // We should send a JMS Text Message + try { + jmsMessage = jmsSession.createTextMessage(eventObject.toString()); + } catch (final Exception e) { + final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " + + this.name + ", could not create JMS Text Message for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + try { + messageProducer.send(jmsMessage); + } catch (final Exception e) { + final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " + + this.name + ", send failed for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + // Close the message producer + try { + messageProducer.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages"; + LOGGER.warn(errorMessage, e); + } + + // Close the session + try { + jmsSession.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages"; + LOGGER.warn(errorMessage, e); + } + + // Close the connection to the JMS server + try { + connection.close(); + } catch (final Exception e) { + final String errorMessage = "close of connection to the JMS server for " + this.name + " failed"; + LOGGER.warn(errorMessage, e); + } + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java new file mode 100644 index 000000000..48f5fe267 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java @@ -0,0 +1,368 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.util.Properties; + +import javax.naming.Context; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * Apex parameters for JMS as an event carrier technology. + * <p> + * The parameters for this plugin are: + * <ol> + * <li>initialContextFactory: JMS uses a naming {@link Context} object to look up the locations of JMS servers and JMS + * topics. An Initial Context Factory is used to when creating a {@link Context} object that can be used for JMS + * lookups. The value of this parameter is passed to the {@link Context} with the label + * {@link Context#INITIAL_CONTEXT_FACTORY}. Its value must be the full canonical path to a class that implements the + * {@code javax.naming.spi.InitialContextFactory} interface. The parameter defaults to the string value + * {@code org.jboss.naming.remote.client.InitialContextFactory}. + * <li>providerURL: The location of the server to use for naming context lookups. The value of this parameter is passed + * to the {@link Context} with the label {@link Context#PROVIDER_URL}. Its value must be a URL that identifies the JMS + * naming server. The parameter defaults to the string value {@code remote://localhost:4447}. + * <li>securityPrincipal: The user name to use for JMS access. The value of this parameter is passed to the + * {@link Context} with the label {@link Context#SECURITY_PRINCIPAL}. Its value must be the user name of a user defined + * on the JMS server. The parameter defaults to the string value {@code userid}. + * <li>securityCredentials:The password to use for JMS access. The value of this parameter is passed to the + * {@link Context} with the label {@link Context#SECURITY_CREDENTIALS}. Its value must be the password of a suer defined + * on the JMS server. The parameter defaults to the string value {@code password}. + * <li>connectionFactory: JMS uses a {@link javax.jms.ConnectionFactory} instance to create connections towards a JMS + * server. The connection factory to use is held in the JMS {@link Context} object. This parameter specifies the label + * to use to look up the {@link javax.jms.ConnectionFactory} instance from the JMS {@link Context}. + * <li>producerTopic: JMS uses a {@link javax.jms.Topic} instance to for sending and receiving messages. The topic to + * use for sending events to JMS from an Apex producer is held in the JMS {@link Context} object. This parameter + * specifies the label to use to look up the {@link javax.jms.Topic} instance in the JMS {@link Context} for the JMS + * server. The topic must, of course, also be defined on the JMS server. The parameter defaults to the string value + * {@code apex-out}. + * <li>consumerTopic: The topic to use for receiving events from JMS in an Apex consumer is held in the JMS + * {@link Context} object. This parameter specifies the label to use to look up the {@link javax.jms.Topic} instance in + * the JMS {@link Context} for the JMS server. The topic must, of course, also be defined on the JMS server. The + * parameter defaults to the string value {@code apex-in}. + * <li>consumerWaitTime: The amount of milliseconds a JMS consumer should wait between checks of its thread execution + * status. The parameter defaults to the long value {@code 100}. + * <li>objectMessageSending: A flag that indicates whether Apex producers should send JMS messages as + * {@link javax.jms.ObjectMessage} instances for java objects (value {@code true}) or as {@link javax.jms.TextMessage} + * instances for strings (value {@code false}) . The parameter defaults to the boolean value {@code true}. + * </ol> + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class JMSCarrierTechnologyParameters extends CarrierTechnologyParameters { + /** The label of this carrier technology. */ + public static final String JMS_CARRIER_TECHNOLOGY_LABEL = "JMS"; + + /** The producer plugin class for the JMS carrier technology. */ + public static final String JMS_EVENT_PRODUCER_PLUGIN_CLASS = ApexJMSProducer.class.getCanonicalName(); + + /** The consumer plugin class for the JMS carrier technology. */ + public static final String JMS_EVENT_CONSUMER_PLUGIN_CLASS = ApexJMSConsumer.class.getCanonicalName(); + + // @formatter:off + + // Default parameter values + private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory"; + private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.jboss.naming.remote.client.InitialContextFactory"; + private static final String DEFAULT_PROVIDER_URL = "remote://localhost:4447"; + private static final String DEFAULT_SECURITY_PRINCIPAL = "userid"; + private static final String DEFAULT_SECURITY_CREDENTIALS = "password"; + private static final String DEFAULT_CONSUMER_TOPIC = "apex-in"; + private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; + private static final int DEFAULT_CONSUMER_WAIT_TIME = 100; + private static final boolean DEFAULT_TO_OBJECT_MESSAGE_SENDING = true; + + // Parameter property map tokens + private static final String PROPERTY_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY; + private static final String PROPERTY_PROVIDER_URL = Context.PROVIDER_URL; + private static final String PROPERTY_SECURITY_PRINCIPAL = Context.SECURITY_PRINCIPAL; + private static final String PROPERTY_SECURITY_CREDENTIALS = Context.SECURITY_CREDENTIALS; + + // JMS carrier parameters + private String connectionFactory = DEFAULT_CONNECTION_FACTORY; + private String initialContextFactory = DEFAULT_INITIAL_CONTEXT_FACTORY; + private String providerURL = DEFAULT_PROVIDER_URL; + private String securityPrincipal = DEFAULT_SECURITY_PRINCIPAL; + private String securityCredentials = DEFAULT_SECURITY_CREDENTIALS; + private String producerTopic = DEFAULT_PRODUCER_TOPIC; + private String consumerTopic = DEFAULT_CONSUMER_TOPIC; + private int consumerWaitTime = DEFAULT_CONSUMER_WAIT_TIME; + private boolean objectMessageSending = DEFAULT_TO_OBJECT_MESSAGE_SENDING; + // @formatter:on + + /** + * Constructor to create a jms carrier technology parameters instance and register the instance with the parameter + * service. + */ + public JMSCarrierTechnologyParameters() { + super(JMSCarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the JMS carrier technology + this.setLabel(JMS_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(JMS_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(JMS_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the JMS producer properties. + * + * @return the JMS producer properties + */ + public Properties getJMSProducerProperties() { + final Properties jmsProperties = new Properties(); + + jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory); + jmsProperties.put(PROPERTY_PROVIDER_URL, providerURL); + jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal); + jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials); + + return jmsProperties; + } + + /** + * Gets the jms consumer properties. + * + * @return the jms consumer properties + */ + public Properties getJMSConsumerProperties() { + final Properties jmsProperties = new Properties(); + + jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory); + jmsProperties.put(PROPERTY_PROVIDER_URL, providerURL); + jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal); + jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials); + + return jmsProperties; + } + + /** + * Gets the connection factory. + * + * @return the connection factory + */ + public String getConnectionFactory() { + return connectionFactory; + } + + /** + * Gets the initial context factory. + * + * @return the initial context factory + */ + public String getInitialContextFactory() { + return initialContextFactory; + } + + /** + * Gets the provider URL. + * + * @return the provider URL + */ + public String getProviderURL() { + return providerURL; + } + + /** + * Gets the security principal. + * + * @return the security principal + */ + public String getSecurityPrincipal() { + return securityPrincipal; + } + + /** + * Gets the security credentials. + * + * @return the security credentials + */ + public String getSecurityCredentials() { + return securityCredentials; + } + + /** + * Gets the producer topic. + * + * @return the producer topic + */ + public String getProducerTopic() { + return producerTopic; + } + + /** + * Gets the consumer topic. + * + * @return the consumer topic + */ + public String getConsumerTopic() { + return consumerTopic; + } + + /** + * Gets the consumer wait time. + * + * @return the consumer wait time + */ + public long getConsumerWaitTime() { + return consumerWaitTime; + } + + /** + * Sets the connection factory. + * + * @param connectionFactory the connection factory + */ + public void setConnectionFactory(final String connectionFactory) { + this.connectionFactory = connectionFactory; + } + + /** + * Sets the initial context factory. + * + * @param initialContextFactory the initial context factory + */ + public void setInitialContextFactory(final String initialContextFactory) { + this.initialContextFactory = initialContextFactory; + } + + /** + * Sets the provider URL. + * + * @param providerURL the provider URL + */ + public void setProviderURL(final String providerURL) { + this.providerURL = providerURL; + } + + /** + * Sets the security principal. + * + * @param securityPrincipal the security principal + */ + public void setSecurityPrincipal(final String securityPrincipal) { + this.securityPrincipal = securityPrincipal; + } + + /** + * Sets the security credentials. + * + * @param securityCredentials the security credentials + */ + public void setSecurityCredentials(final String securityCredentials) { + this.securityCredentials = securityCredentials; + } + + /** + * Sets the producer topic. + * + * @param producerTopic the producer topic + */ + public void setProducerTopic(final String producerTopic) { + this.producerTopic = producerTopic; + } + + /** + * Sets the consumer topic. + * + * @param consumerTopic the consumer topic + */ + public void setConsumerTopic(final String consumerTopic) { + this.consumerTopic = consumerTopic; + } + + /** + * Sets the consumer wait time. + * + * @param consumerWaitTime the consumer wait time + */ + public void setConsumerWaitTime(final int consumerWaitTime) { + this.consumerWaitTime = consumerWaitTime; + } + + /** + * Checks if is object message sending. + * + * @return true, if checks if is object message sending + */ + public boolean isObjectMessageSending() { + return objectMessageSending; + } + + /** + * Sets the object message sending. + * + * @param objectMessageSending the object message sending + */ + public void setObjectMessageSending(final boolean objectMessageSending) { + this.objectMessageSending = objectMessageSending; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + final StringBuilder errorMessageBuilder = new StringBuilder(); + + errorMessageBuilder.append(super.validate()); + + if (initialContextFactory == null || initialContextFactory.trim().length() == 0) { + errorMessageBuilder + .append(" initialContextFactory not specified, must be specified as a string that is a class" + + " that implements the interface org.jboss.naming.remote.client.InitialContextFactory\n"); + } + + if (providerURL == null || providerURL.trim().length() == 0) { + errorMessageBuilder.append( + " providerURL not specified, must be specified as a URL string that specifies the location of " + + "configuration information for the service provider to use such as remote://localhost:4447\n"); + } + + if (securityPrincipal == null || securityPrincipal.trim().length() == 0) { + errorMessageBuilder.append( + " securityPrincipal not specified, must be specified the identity of the principal for authenticating the caller to the service\n"); + } + + if (securityCredentials == null || securityCredentials.trim().length() == 0) { + errorMessageBuilder.append(" securityCredentials not specified, must be specified as " + + "the credentials of the principal for authenticating the caller to the service\n"); + } + + if (producerTopic == null || producerTopic.trim().length() == 0) { + errorMessageBuilder.append( + " producerTopic not specified, must be a string that identifies the JMS topic on which Apex will send events\n"); + } + + if (consumerTopic == null || consumerTopic.trim().length() == 0) { + errorMessageBuilder.append( + " consumerTopic not specified, must be a string that identifies the JMS topic on which Apex will recieve events\n"); + } + + if (consumerWaitTime < 0) { + errorMessageBuilder.append(" consumerWaitTime [" + consumerWaitTime + + "] invalid, must be specified as consumerWaitTime >= 0\n"); + } + + return errorMessageBuilder.toString(); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java new file mode 100644 index 000000000..115bc5009 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the APEX event carrier technology plugin for JMS. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; |