From 4cfa2e2d98f6877d54da304ef17f096284430908 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Thu, 13 Sep 2018 15:25:32 +0100 Subject: Sonar/Checkstyle in service/plugins Sonar and Checkstyle changes in plugins and services, and knock on changes Issue-ID: POLICY-1034 Change-Id: Iff7df74e54fce2c661dcc2fae75ae93d4cacfe5b Signed-off-by: liamfallon --- .../plugins/event/carrier/jms/ApexJMSConsumer.java | 280 -------------- .../plugins/event/carrier/jms/ApexJMSProducer.java | 287 -------------- .../plugins/event/carrier/jms/ApexJmsConsumer.java | 280 ++++++++++++++ .../plugins/event/carrier/jms/ApexJmsProducer.java | 292 ++++++++++++++ .../jms/JMSCarrierTechnologyParameters.java | 375 ------------------ .../jms/JmsCarrierTechnologyParameters.java | 375 ++++++++++++++++++ .../event/carrier/kafka/ApexKafkaConsumer.java | 10 +- .../event/carrier/kafka/ApexKafkaProducer.java | 13 +- .../kafka/KAFKACarrierTechnologyParameters.java | 419 --------------------- .../kafka/KafkaCarrierTechnologyParameters.java | 419 +++++++++++++++++++++ .../carrier/restclient/ApexRestClientConsumer.java | 16 +- .../carrier/restclient/ApexRestClientProducer.java | 33 +- .../RESTClientCarrierTechnologyParameters.java | 135 ------- .../RestClientCarrierTechnologyParameters.java | 135 +++++++ .../carrier/restrequestor/ApexRestRequest.java | 35 ++ .../restrequestor/ApexRestRequestorConsumer.java | 68 ++-- .../restrequestor/ApexRestRequestorProducer.java | 6 +- .../RESTRequestorCarrierTechnologyParameters.java | 125 ------ .../RestRequestorCarrierTechnologyParameters.java | 125 ++++++ .../adapt/restrequestor/TestRESTRequestor.java | 291 -------------- .../adapt/restrequestor/TestRestRequestor.java | 363 ++++++++++++++++++ .../restrequestor/TestRestRequestorEndpoint.java | 56 +++ .../prodcons/File2RESTRequest2FileDelete.json | 6 +- .../prodcons/File2RESTRequest2FileGet.json | 6 +- .../File2RESTRequest2FileGetConsumerAlone.json | 6 +- .../prodcons/File2RESTRequest2FileGetMulti.json | 10 +- .../File2RESTRequest2FileGetProducerAlone.json | 4 +- .../prodcons/File2RESTRequest2FilePost.json | 6 +- .../prodcons/File2RESTRequest2FilePut.json | 6 +- .../carrier/restserver/ApexRestServerConsumer.java | 28 +- .../carrier/restserver/ApexRestServerProducer.java | 18 +- .../RESTServerCarrierTechnologyParameters.java | 136 ------- .../RestServerCarrierTechnologyParameters.java | 136 +++++++ .../carrier/restserver/RestServerEndpoint.java | 20 +- .../carrier/websocket/ApexWebSocketConsumer.java | 11 +- .../carrier/websocket/ApexWebSocketProducer.java | 31 +- .../WEBSOCKETCarrierTechnologyParameters.java | 116 ------ .../WebSocketCarrierTechnologyParameters.java | 116 ++++++ 38 files changed, 2487 insertions(+), 2307 deletions(-) delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsProducer.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RESTClientCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RestClientCarrierTechnologyParameters.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RESTRequestorCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParameters.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRESTRequestor.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestor.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RESTServerCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerCarrierTechnologyParameters.java delete mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WEBSOCKETCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WebSocketCarrierTechnologyParameters.java (limited to 'plugins/plugins-event/plugins-event-carrier') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java deleted file mode 100644 index 93174b941..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSConsumer.java +++ /dev/null @@ -1,280 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.jms; - -import java.util.EnumMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.InitialContext; - -import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; -import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.service.engine.event.ApexEventConsumer; -import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; -import org.onap.policy.apex.service.engine.event.PeeredReference; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class implements an Apex event consumer that receives events using JMS. - * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class ApexJMSConsumer implements MessageListener, ApexEventConsumer, Runnable { - // Get a reference to the logger - private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSConsumer.class); - - // The Apex and JMS parameters read from the parameter service - private JMSCarrierTechnologyParameters jmsConsumerProperties; - - // The event receiver that will receive events from this consumer - private ApexEventReceiver eventReceiver; - - // The consumer thread and stopping flag - private Thread consumerThread; - private boolean stopOrderedFlag = false; - - // The connection to the JMS server - private Connection connection; - - // The topic on which we receive events from JMS - private Topic jmsIncomingTopic; - - // The name for this consumer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - @Override - public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { - this.eventReceiver = incomingEventReceiver; - - this.name = consumerName; - - // Check and get the JMS Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) { - final String errorMessage = "specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() - + "\" are not applicable to a JMS consumer"; - LOGGER.warn(errorMessage); - throw new ApexEventException(errorMessage); - } - jmsConsumerProperties = (JMSCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); - - // Look up the JMS connection factory - InitialContext jmsContext = null; - ConnectionFactory connectionFactory = null; - try { - jmsContext = new InitialContext(jmsConsumerProperties.getJmsConsumerProperties()); - connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory()); - - // Check if we actually got a connection factory - if (connectionFactory == null) { - throw new NullPointerException( - "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null"); - } - } catch (final Exception e) { - final String errorMessage = "lookup of JMS connection factory \"" - + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \"" - + jmsConsumerProperties.getJmsConsumerProperties() + "\""; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Lookup the topic on which we will receive events - try { - jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic()); - - // Check if we actually got a topic - if (jmsIncomingTopic == null) { - throw new NullPointerException( - "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null"); - } - } catch (final Exception e) { - final String errorMessage = "lookup of JMS topic \"" + jmsConsumerProperties.getConsumerTopic() - + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJmsConsumerProperties() - + "\""; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Create and start a connection to the JMS server - try { - connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(), - jmsConsumerProperties.getSecurityCredentials()); - connection.start(); - } catch (final Exception e) { - final String errorMessage = "connection to the JMS server failed for JMS properties \"" - + jmsConsumerProperties.getJmsConsumerProperties() + "\""; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() - */ - @Override - public void start() { - // Configure and start the event reception thread - final String threadName = this.getClass().getName() + ":" + this.name; - consumerThread = new ApplicationThreadFactory(threadName).newThread(this); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() - */ - @Override - public String getName() { - return name; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service. - * parameters. eventhandler.EventHandlerPeeredMode) - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service. - * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - // JMS session and message consumer for receiving messages - try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { - // Create a message consumer for reception of messages and set this class as a message listener - createMessageConsumer(jmsSession); - } catch (final Exception e) { - final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages"; - LOGGER.warn(errorMessage, e); - throw new ApexEventRuntimeException(errorMessage, e); - } - // Everything is now set up - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: " - + jmsConsumerProperties.getConsumerTopic()); - } - } - - /** - * The helper function to create a message consumer from a given JMS session - * - * @param jmsSession a JMS session - */ - private void createMessageConsumer(final Session jmsSession) { - try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) { - messageConsumer.setMessageListener(this); - - // The endless loop that receives events over JMS - while (consumerThread.isAlive() && !stopOrderedFlag) { - ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); - } - } catch (final Exception e) { - final String errorMessage = "failed to create a JMS message consumer for receiving messages"; - LOGGER.warn(errorMessage, e); - throw new ApexEventRuntimeException(errorMessage, e); - } - } - - /* - * (non-Javadoc) - * - * @see javax.jms.MessageListener#onMessage(javax.jms.Message) - */ - @Override - public void onMessage(final Message jmsMessage) { - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(), - jmsMessage.getJMSType()); - } - - eventReceiver.receiveEvent(jmsMessage); - } catch (final Exception e) { - final String errorMessage = "failed to receive message from JMS"; - LOGGER.warn(errorMessage, e); - throw new ApexEventRuntimeException(errorMessage, e); - } - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() - */ - @Override - public void stop() { - stopOrderedFlag = true; - - while (consumerThread.isAlive()) { - ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); - } - - // Close the connection to the JMS server - try { - if (connection != null) { - connection.close(); - } - } catch (final Exception e) { - final String errorMessage = "close of connection to the JMS server failed"; - LOGGER.warn(errorMessage, e); - } - } - -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java deleted file mode 100644 index 86e9555f9..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJMSProducer.java +++ /dev/null @@ -1,287 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.jms; - -import java.io.Serializable; -import java.util.EnumMap; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.InitialContext; - -import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; -import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; -import org.onap.policy.apex.service.engine.event.PeeredReference; -import org.onap.policy.apex.service.engine.event.SynchronousEventCache; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; -import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Concrete implementation of an Apex event producer that sends events using JMS. - * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class ApexJMSProducer implements ApexEventProducer { - - // Get a reference to the logger - private static final Logger LOGGER = LoggerFactory.getLogger(ApexJMSProducer.class); - - // The JMS parameters read from the parameter service - private JMSCarrierTechnologyParameters jmsProducerProperties; - - // The connection to the JMS server - private Connection connection; - - // The JMS session on which we will send events - private Session jmsSession; - - // The producer on which we will send events - private MessageProducer messageProducer; - - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String, - * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters) - */ - @Override - public void init(final String producerName, final EventHandlerParameters producerParameters) - throws ApexEventException { - this.name = producerName; - - // Check and get the JMS Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof JMSCarrierTechnologyParameters)) { - final String errorMessage = "specified producer properties are not applicable to a JMS producer (" + this.name + ")"; - LOGGER.warn(errorMessage); - throw new ApexEventException(errorMessage); - } - jmsProducerProperties = (JMSCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); - - // Look up the JMS connection factory - InitialContext jmsContext = null; - ConnectionFactory connectionFactory = null; - try { - jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties()); - connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory()); - - // Check if we actually got a connection factory - if (connectionFactory == null) { - throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory() - + "\" returned null for producer (" + this.name + ")"); - } - } catch (final Exception e) { - final String errorMessage = "lookup of JMS connection factory \"" - + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \"" - + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")"; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Lookup the topic on which we will send events - Topic jmsOutgoingTopic; - try { - jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic()); - - // Check if we actually got a topic - if (jmsOutgoingTopic == null) { - throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic() - + "\" returned null for producer (" + this.name + ")"); - } - } catch (final Exception e) { - final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic() - + "\" failed for JMS producer properties \"" + jmsProducerProperties.getJmsProducerProperties() - + "\" for producer (" + this.name + ")"; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Create and start a connection to the JMS server - try { - connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(), - jmsProducerProperties.getSecurityCredentials()); - connection.start(); - } catch (final Exception e) { - final String errorMessage = "connection to JMS server failed for JMS properties \"" - + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")"; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Create a JMS session for sending events - try { - jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } catch (final Exception e) { - final String errorMessage = "creation of session to JMS server failed for JMS properties \"" - + jmsProducerProperties.getJmsConsumerProperties() + "\" for producer (" + this.name + ")"; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - - // Create a JMS message producer for sending events - try { - messageProducer = jmsSession.createProducer(jmsOutgoingTopic); - } catch (final Exception e) { - final String errorMessage = - "creation of producer for sending events to JMS server failed for JMS properties \"" - + jmsProducerProperties.getJmsConsumerProperties() + "\""; - LOGGER.warn(errorMessage, e); - throw new ApexEventException(errorMessage, e); - } - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() - */ - @Override - public String getName() { - return name; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service. - * parameters. eventhandler.EventHandlerPeeredMode) - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service. - * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String, - * java.lang.Object) - */ - @Override - public void sendEvent(final long executionId, final String eventname, final Object eventObject) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } - - // Check if the object to be sent is serializable - if (!Serializable.class.isAssignableFrom(eventObject.getClass())) { - final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " - + this.name + ", object of type \"" + eventObject.getClass().getCanonicalName() - + "\" is not serializable"; - LOGGER.warn(errorMessage); - throw new ApexEventRuntimeException(errorMessage); - } - - // The JMS message to send is constructed using the JMS session - Message jmsMessage = null; - - // Check the type of JMS message to send - if (jmsProducerProperties.isObjectMessageSending()) { - // We should send a JMS Object Message - try { - jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject); - } catch (final Exception e) { - final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " - + this.name + ", could not create JMS Object Message for object \"" + eventObject; - LOGGER.warn(errorMessage); - throw new ApexEventRuntimeException(errorMessage); - } - } else { - // We should send a JMS Text Message - try { - jmsMessage = jmsSession.createTextMessage(eventObject.toString()); - } catch (final Exception e) { - final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " - + this.name + ", could not create JMS Text Message for object \"" + eventObject; - LOGGER.warn(errorMessage); - throw new ApexEventRuntimeException(errorMessage); - } - } - - try { - messageProducer.send(jmsMessage); - } catch (final Exception e) { - final String errorMessage = "could not send event \"" + eventname + "\" on JMS message producer " - + this.name + ", send failed for object \"" + eventObject; - LOGGER.warn(errorMessage); - throw new ApexEventRuntimeException(errorMessage); - } - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() - */ - @Override - public void stop() { - // Close the message producer - try { - messageProducer.close(); - } catch (final Exception e) { - final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages"; - LOGGER.warn(errorMessage, e); - } - - // Close the session - try { - jmsSession.close(); - } catch (final Exception e) { - final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages"; - LOGGER.warn(errorMessage, e); - } - - // Close the connection to the JMS server - try { - connection.close(); - } catch (final Exception e) { - final String errorMessage = "close of connection to the JMS server for " + this.name + " failed"; - LOGGER.warn(errorMessage, e); - } - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java new file mode 100644 index 000000000..de324512b --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsConsumer.java @@ -0,0 +1,280 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.util.EnumMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements an Apex event consumer that receives events using JMS. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexJmsConsumer implements MessageListener, ApexEventConsumer, Runnable { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsConsumer.class); + + // The Apex and JMS parameters read from the parameter service + private JmsCarrierTechnologyParameters jmsConsumerProperties; + + // The event receiver that will receive events from this consumer + private ApexEventReceiver eventReceiver; + + // The consumer thread and stopping flag + private Thread consumerThread; + private boolean stopOrderedFlag = false; + + // The connection to the JMS server + private Connection connection; + + // The topic on which we receive events from JMS + private Topic jmsIncomingTopic; + + // The name for this consumer + private String name = null; + + // The peer references for this event handler + private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + + this.name = consumerName; + + // Check and get the JMS Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties of type \"" + + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + + "\" are not applicable to a JMS consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + jmsConsumerProperties = (JmsCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // Look up the JMS connection factory + InitialContext jmsContext = null; + ConnectionFactory connectionFactory = null; + try { + jmsContext = new InitialContext(jmsConsumerProperties.getJmsConsumerProperties()); + connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsConsumerProperties.getConnectionFactory()); + + // Check if we actually got a connection factory + if (connectionFactory == null) { + throw new NullPointerException( + "JMS context lookup of \"" + jmsConsumerProperties.getConnectionFactory() + "\" returned null"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS connection factory \"" + + jmsConsumerProperties.getConnectionFactory() + "\" failed for JMS consumer properties \"" + + jmsConsumerProperties.getJmsConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Lookup the topic on which we will receive events + try { + jmsIncomingTopic = (Topic) jmsContext.lookup(jmsConsumerProperties.getConsumerTopic()); + + // Check if we actually got a topic + if (jmsIncomingTopic == null) { + throw new NullPointerException( + "JMS context lookup of \"" + jmsConsumerProperties.getConsumerTopic() + "\" returned null"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS topic \"" + jmsConsumerProperties.getConsumerTopic() + + "\" failed for JMS consumer properties \"" + jmsConsumerProperties.getJmsConsumerProperties() + + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create and start a connection to the JMS server + try { + connection = connectionFactory.createConnection(jmsConsumerProperties.getSecurityPrincipal(), + jmsConsumerProperties.getSecurityCredentials()); + connection.start(); + } catch (final Exception e) { + final String errorMessage = "connection to the JMS server failed for JMS properties \"" + + jmsConsumerProperties.getJmsConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // JMS session and message consumer for receiving messages + try (final Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + // Create a message consumer for reception of messages and set this class as a message listener + createMessageConsumer(jmsSession); + } catch (final Exception e) { + final String errorMessage = "failed to create a JMS session towards the JMS server for receiving messages"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + // Everything is now set up + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver " + this.getClass().getName() + ":" + this.name + " subscribed to JMS topic: " + + jmsConsumerProperties.getConsumerTopic()); + } + } + + /** + * The helper function to create a message consumer from a given JMS session. + * + * @param jmsSession a JMS session + */ + private void createMessageConsumer(final Session jmsSession) { + try (final MessageConsumer messageConsumer = jmsSession.createConsumer(jmsIncomingTopic)) { + messageConsumer.setMessageListener(this); + + // The endless loop that receives events over JMS + while (consumerThread.isAlive() && !stopOrderedFlag) { + ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); + } + } catch (final Exception e) { + final String errorMessage = "failed to create a JMS message consumer for receiving messages"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see javax.jms.MessageListener#onMessage(javax.jms.Message) + */ + @Override + public void onMessage(final Message jmsMessage) { + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, jmsMessage.getJMSMessageID(), + jmsMessage.getJMSType()); + } + + eventReceiver.receiveEvent(jmsMessage); + } catch (final Exception e) { + final String errorMessage = "failed to receive message from JMS"; + LOGGER.warn(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + stopOrderedFlag = true; + + while (consumerThread.isAlive()) { + ThreadUtilities.sleep(jmsConsumerProperties.getConsumerWaitTime()); + } + + // Close the connection to the JMS server + try { + if (connection != null) { + connection.close(); + } + } catch (final Exception e) { + final String errorMessage = "close of connection to the JMS server failed"; + LOGGER.warn(errorMessage, e); + } + } + +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsProducer.java new file mode 100644 index 000000000..7277a7dc3 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/ApexJmsProducer.java @@ -0,0 +1,292 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.io.Serializable; +import java.util.EnumMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events using JMS. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexJmsProducer implements ApexEventProducer { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexJmsProducer.class); + + // Recurring string constants + private static final String COULD_NOT_SEND_PREFIX = "could not send event \""; + private static final String FOR_PRODUCER_TAG = "\" for producer ("; + private static final String JMS_MESSAGE_PRODUCER_TAG = "\" on JMS message producer "; + + // The JMS parameters read from the parameter service + private JmsCarrierTechnologyParameters jmsProducerProperties; + + // The connection to the JMS server + private Connection connection; + + // The JMS session on which we will send events + private Session jmsSession; + + // The producer on which we will send events + private MessageProducer messageProducer; + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String, + * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters) + */ + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the JMS Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof JmsCarrierTechnologyParameters)) { + final String errorMessage = "specified producer properties are not applicable to a JMS producer (" + + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + jmsProducerProperties = (JmsCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + // Look up the JMS connection factory + InitialContext jmsContext = null; + ConnectionFactory connectionFactory = null; + try { + jmsContext = new InitialContext(jmsProducerProperties.getJmsProducerProperties()); + connectionFactory = (ConnectionFactory) jmsContext.lookup(jmsProducerProperties.getConnectionFactory()); + + // Check if we actually got a connection factory + if (connectionFactory == null) { + throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getConnectionFactory() + + "\" returned null for producer (" + this.name + ")"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS connection factory \"" + + jmsProducerProperties.getConnectionFactory() + "\" failed for JMS producer properties \"" + + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Lookup the topic on which we will send events + Topic jmsOutgoingTopic; + try { + jmsOutgoingTopic = (Topic) jmsContext.lookup(jmsProducerProperties.getProducerTopic()); + + // Check if we actually got a topic + if (jmsOutgoingTopic == null) { + throw new NullPointerException("JMS context lookup of \"" + jmsProducerProperties.getProducerTopic() + + "\" returned null for producer (" + this.name + ")"); + } + } catch (final Exception e) { + final String errorMessage = "lookup of JMS topic \"" + jmsProducerProperties.getProducerTopic() + + "\" failed for JMS producer properties \"" + + jmsProducerProperties.getJmsProducerProperties() + FOR_PRODUCER_TAG + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create and start a connection to the JMS server + try { + connection = connectionFactory.createConnection(jmsProducerProperties.getSecurityPrincipal(), + jmsProducerProperties.getSecurityCredentials()); + connection.start(); + } catch (final Exception e) { + final String errorMessage = "connection to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create a JMS session for sending events + try { + jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (final Exception e) { + final String errorMessage = "creation of session to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJmsConsumerProperties() + FOR_PRODUCER_TAG + this.name + ")"; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Create a JMS message producer for sending events + try { + messageProducer = jmsSession.createProducer(jmsOutgoingTopic); + } catch (final Exception e) { + final String errorMessage = "creation of producer for sending events " + + "to JMS server failed for JMS properties \"" + + jmsProducerProperties.getJmsConsumerProperties() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service. + * parameters. eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.String, + * java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventname, final Object eventObject) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap + .get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Check if the object to be sent is serializable + if (!Serializable.class.isAssignableFrom(eventObject.getClass())) { + final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name + + ", object of type \"" + eventObject.getClass().getCanonicalName() + + "\" is not serializable"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + // The JMS message to send is constructed using the JMS session + Message jmsMessage = null; + + // Check the type of JMS message to send + if (jmsProducerProperties.isObjectMessageSending()) { + // We should send a JMS Object Message + try { + jmsMessage = jmsSession.createObjectMessage((Serializable) eventObject); + } catch (final Exception e) { + final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + + this.name + ", could not create JMS Object Message for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } else { + // We should send a JMS Text Message + try { + jmsMessage = jmsSession.createTextMessage(eventObject.toString()); + } catch (final Exception e) { + final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + + this.name + ", could not create JMS Text Message for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + try { + messageProducer.send(jmsMessage); + } catch (final Exception e) { + final String errorMessage = COULD_NOT_SEND_PREFIX + eventname + JMS_MESSAGE_PRODUCER_TAG + this.name + + ", send failed for object \"" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + // Close the message producer + try { + messageProducer.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close JMS message producer " + this.name + " for sending messages"; + LOGGER.warn(errorMessage, e); + } + + // Close the session + try { + jmsSession.close(); + } catch (final Exception e) { + final String errorMessage = "failed to close the JMS session for " + this.name + " for sending messages"; + LOGGER.warn(errorMessage, e); + } + + // Close the connection to the JMS server + try { + connection.close(); + } catch (final Exception e) { + final String errorMessage = "close of connection to the JMS server for " + this.name + " failed"; + LOGGER.warn(errorMessage, e); + } + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java deleted file mode 100644 index 80977b5d8..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JMSCarrierTechnologyParameters.java +++ /dev/null @@ -1,375 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.jms; - -import java.util.Properties; - -import javax.naming.Context; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.GroupValidationResult; -import org.onap.policy.common.parameters.ValidationStatus; - -/** - * Apex parameters for JMS as an event carrier technology. - *

- * The parameters for this plugin are: - *

    - *
  1. initialContextFactory: JMS uses a naming {@link Context} object to look up the locations of JMS servers and JMS - * topics. An Initial Context Factory is used to when creating a {@link Context} object that can be used for JMS - * lookups. The value of this parameter is passed to the {@link Context} with the label - * {@link Context#INITIAL_CONTEXT_FACTORY}. Its value must be the full canonical path to a class that implements the - * {@code javax.naming.spi.InitialContextFactory} interface. The parameter defaults to the string value - * {@code org.jboss.naming.remote.client.InitialContextFactory}. - *
  2. providerURL: The location of the server to use for naming context lookups. The value of this parameter is passed - * to the {@link Context} with the label {@link Context#PROVIDER_URL}. Its value must be a URL that identifies the JMS - * naming server. The parameter defaults to the string value {@code remote://localhost:4447}. - *
  3. securityPrincipal: The user name to use for JMS access. The value of this parameter is passed to the - * {@link Context} with the label {@link Context#SECURITY_PRINCIPAL}. Its value must be the user name of a user defined - * on the JMS server. The parameter defaults to the string value {@code userid}. - *
  4. securityCredentials:The password to use for JMS access. The value of this parameter is passed to the - * {@link Context} with the label {@link Context#SECURITY_CREDENTIALS}. Its value must be the password of a suer defined - * on the JMS server. The parameter defaults to the string value {@code password}. - *
  5. connectionFactory: JMS uses a {@link javax.jms.ConnectionFactory} instance to create connections towards a JMS - * server. The connection factory to use is held in the JMS {@link Context} object. This parameter specifies the label - * to use to look up the {@link javax.jms.ConnectionFactory} instance from the JMS {@link Context}. - *
  6. producerTopic: JMS uses a {@link javax.jms.Topic} instance to for sending and receiving messages. The topic to - * use for sending events to JMS from an Apex producer is held in the JMS {@link Context} object. This parameter - * specifies the label to use to look up the {@link javax.jms.Topic} instance in the JMS {@link Context} for the JMS - * server. The topic must, of course, also be defined on the JMS server. The parameter defaults to the string value - * {@code apex-out}. - *
  7. consumerTopic: The topic to use for receiving events from JMS in an Apex consumer is held in the JMS - * {@link Context} object. This parameter specifies the label to use to look up the {@link javax.jms.Topic} instance in - * the JMS {@link Context} for the JMS server. The topic must, of course, also be defined on the JMS server. The - * parameter defaults to the string value {@code apex-in}. - *
  8. consumerWaitTime: The amount of milliseconds a JMS consumer should wait between checks of its thread execution - * status. The parameter defaults to the long value {@code 100}. - *
  9. objectMessageSending: A flag that indicates whether Apex producers should send JMS messages as - * {@link javax.jms.ObjectMessage} instances for java objects (value {@code true}) or as {@link javax.jms.TextMessage} - * instances for strings (value {@code false}) . The parameter defaults to the boolean value {@code true}. - *
- * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class JMSCarrierTechnologyParameters extends CarrierTechnologyParameters { - /** The label of this carrier technology. */ - public static final String JMS_CARRIER_TECHNOLOGY_LABEL = "JMS"; - - /** The producer plugin class for the JMS carrier technology. */ - public static final String JMS_EVENT_PRODUCER_PLUGIN_CLASS = ApexJMSProducer.class.getCanonicalName(); - - /** The consumer plugin class for the JMS carrier technology. */ - public static final String JMS_EVENT_CONSUMER_PLUGIN_CLASS = ApexJMSConsumer.class.getCanonicalName(); - - // @formatter:off - - // Default parameter values - private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory"; - private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.jboss.naming.remote.client.InitialContextFactory"; - private static final String DEFAULT_PROVIDER_URL = "remote://localhost:4447"; - private static final String DEFAULT_SECURITY_PRINCIPAL = "userid"; - private static final String DEFAULT_SECURITY_CREDENTIALS = "password"; - private static final String DEFAULT_CONSUMER_TOPIC = "apex-in"; - private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; - private static final int DEFAULT_CONSUMER_WAIT_TIME = 100; - private static final boolean DEFAULT_TO_OBJECT_MESSAGE_SENDING = true; - - // Parameter property map tokens - private static final String PROPERTY_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY; - private static final String PROPERTY_PROVIDER_URL = Context.PROVIDER_URL; - private static final String PROPERTY_SECURITY_PRINCIPAL = Context.SECURITY_PRINCIPAL; - private static final String PROPERTY_SECURITY_CREDENTIALS = Context.SECURITY_CREDENTIALS; - - // JMS carrier parameters - private String connectionFactory = DEFAULT_CONNECTION_FACTORY; - private String initialContextFactory = DEFAULT_INITIAL_CONTEXT_FACTORY; - private String providerUrl = DEFAULT_PROVIDER_URL; - private String securityPrincipal = DEFAULT_SECURITY_PRINCIPAL; - private String securityCredentials = DEFAULT_SECURITY_CREDENTIALS; - private String producerTopic = DEFAULT_PRODUCER_TOPIC; - private String consumerTopic = DEFAULT_CONSUMER_TOPIC; - private int consumerWaitTime = DEFAULT_CONSUMER_WAIT_TIME; - private boolean objectMessageSending = DEFAULT_TO_OBJECT_MESSAGE_SENDING; - // @formatter:on - - /** - * Constructor to create a jms carrier technology parameters instance and register the instance with the parameter - * service. - */ - public JMSCarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the JMS carrier technology - this.setLabel(JMS_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(JMS_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(JMS_EVENT_CONSUMER_PLUGIN_CLASS); - } - - /** - * Gets the JMS producer properties. - * - * @return the JMS producer properties - */ - public Properties getJmsProducerProperties() { - return getJmsProperties(); - } - - /** - * Gets the jms consumer properties. - * - * @return the jms consumer properties - */ - public Properties getJmsConsumerProperties() { - return getJmsProperties(); - } - - /** - * Gets the JMS consumer properties. - * - * @return the jms consumer properties - */ - private Properties getJmsProperties() { - final Properties jmsProperties = new Properties(); - - jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory); - jmsProperties.put(PROPERTY_PROVIDER_URL, providerUrl); - jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal); - jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials); - - return jmsProperties; - } - - /** - * Gets the connection factory. - * - * @return the connection factory - */ - public String getConnectionFactory() { - return connectionFactory; - } - - /** - * Gets the initial context factory. - * - * @return the initial context factory - */ - public String getInitialContextFactory() { - return initialContextFactory; - } - - /** - * Gets the provider URL. - * - * @return the provider URL - */ - public String getProviderUrl() { - return providerUrl; - } - - /** - * Gets the security principal. - * - * @return the security principal - */ - public String getSecurityPrincipal() { - return securityPrincipal; - } - - /** - * Gets the security credentials. - * - * @return the security credentials - */ - public String getSecurityCredentials() { - return securityCredentials; - } - - /** - * Gets the producer topic. - * - * @return the producer topic - */ - public String getProducerTopic() { - return producerTopic; - } - - /** - * Gets the consumer topic. - * - * @return the consumer topic - */ - public String getConsumerTopic() { - return consumerTopic; - } - - /** - * Gets the consumer wait time. - * - * @return the consumer wait time - */ - public long getConsumerWaitTime() { - return consumerWaitTime; - } - - /** - * Sets the connection factory. - * - * @param connectionFactory the connection factory - */ - public void setConnectionFactory(final String connectionFactory) { - this.connectionFactory = connectionFactory; - } - - /** - * Sets the initial context factory. - * - * @param initialContextFactory the initial context factory - */ - public void setInitialContextFactory(final String initialContextFactory) { - this.initialContextFactory = initialContextFactory; - } - - /** - * Sets the provider URL. - * - * @param providerUrl the provider URL - */ - public void setProviderUrl(final String providerUrl) { - this.providerUrl = providerUrl; - } - - /** - * Sets the security principal. - * - * @param securityPrincipal the security principal - */ - public void setSecurityPrincipal(final String securityPrincipal) { - this.securityPrincipal = securityPrincipal; - } - - /** - * Sets the security credentials. - * - * @param securityCredentials the security credentials - */ - public void setSecurityCredentials(final String securityCredentials) { - this.securityCredentials = securityCredentials; - } - - /** - * Sets the producer topic. - * - * @param producerTopic the producer topic - */ - public void setProducerTopic(final String producerTopic) { - this.producerTopic = producerTopic; - } - - /** - * Sets the consumer topic. - * - * @param consumerTopic the consumer topic - */ - public void setConsumerTopic(final String consumerTopic) { - this.consumerTopic = consumerTopic; - } - - /** - * Sets the consumer wait time. - * - * @param consumerWaitTime the consumer wait time - */ - public void setConsumerWaitTime(final int consumerWaitTime) { - this.consumerWaitTime = consumerWaitTime; - } - - /** - * Checks if is object message sending. - * - * @return true, if checks if is object message sending - */ - public boolean isObjectMessageSending() { - return objectMessageSending; - } - - /** - * Sets the object message sending. - * - * @param objectMessageSending the object message sending - */ - public void setObjectMessageSending(final boolean objectMessageSending) { - this.objectMessageSending = objectMessageSending; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() - */ - @Override - public GroupValidationResult validate() { - final GroupValidationResult result = super.validate(); - - if (initialContextFactory == null || initialContextFactory.trim().length() == 0) { - result.setResult("initialContextFactory", ValidationStatus.INVALID, - "initialContextFactory must be specified as a string that is a class that implements the " - + "interface org.jboss.naming.remote.client.InitialContextFactory"); - } - - if (providerUrl == null || providerUrl.trim().length() == 0) { - result.setResult("providerUrl", ValidationStatus.INVALID, - "providerUrl must be specified as a URL string that specifies the location of " - + "configuration information for the service provider to use " - + "such as remote://localhost:4447"); - } - - if (securityPrincipal == null || securityPrincipal.trim().length() == 0) { - result.setResult("securityPrincipal", ValidationStatus.INVALID, - "securityPrincipal must be specified the identity of the principal for authenticating " - + "the caller to the service"); - } - - if (securityCredentials == null || securityCredentials.trim().length() == 0) { - result.setResult("securityCredentials", ValidationStatus.INVALID, - " securityCredentials must be specified as the credentials of the " - + "principal for authenticating the caller to the service"); - } - - if (producerTopic == null || producerTopic.trim().length() == 0) { - result.setResult("producerTopic", ValidationStatus.INVALID, - " producerTopic must be a string that identifies the JMS topic " - + "on which Apex will send events"); - } - - if (consumerTopic == null || consumerTopic.trim().length() == 0) { - result.setResult("consumerTopic", ValidationStatus.INVALID, - " consumerTopic must be a string that identifies the JMS topic " - + "on which Apex will recieve events"); - } - - if (consumerWaitTime < 0) { - result.setResult("consumerWaitTime", ValidationStatus.INVALID, - "[" + consumerWaitTime + "] invalid, must be specified as consumerWaitTime >= 0"); - } - - return result; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java new file mode 100644 index 000000000..8b120ca3a --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java @@ -0,0 +1,375 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.jms; + +import java.util.Properties; + +import javax.naming.Context; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationStatus; + +/** + * Apex parameters for JMS as an event carrier technology. + * + *

The parameters for this plugin are: + *

    + *
  1. initialContextFactory: JMS uses a naming {@link Context} object to look up the locations of JMS servers and JMS + * topics. An Initial Context Factory is used to when creating a {@link Context} object that can be used for JMS + * lookups. The value of this parameter is passed to the {@link Context} with the label + * {@link Context#INITIAL_CONTEXT_FACTORY}. Its value must be the full canonical path to a class that implements the + * {@code javax.naming.spi.InitialContextFactory} interface. The parameter defaults to the string value + * {@code org.jboss.naming.remote.client.InitialContextFactory}. + *
  2. providerURL: The location of the server to use for naming context lookups. The value of this parameter is passed + * to the {@link Context} with the label {@link Context#PROVIDER_URL}. Its value must be a URL that identifies the JMS + * naming server. The parameter defaults to the string value {@code remote://localhost:4447}. + *
  3. securityPrincipal: The user name to use for JMS access. The value of this parameter is passed to the + * {@link Context} with the label {@link Context#SECURITY_PRINCIPAL}. Its value must be the user name of a user defined + * on the JMS server. The parameter defaults to the string value {@code userid}. + *
  4. securityCredentials:The password to use for JMS access. The value of this parameter is passed to the + * {@link Context} with the label {@link Context#SECURITY_CREDENTIALS}. Its value must be the password of a suer defined + * on the JMS server. The parameter defaults to the string value {@code password}. + *
  5. connectionFactory: JMS uses a {@link javax.jms.ConnectionFactory} instance to create connections towards a JMS + * server. The connection factory to use is held in the JMS {@link Context} object. This parameter specifies the label + * to use to look up the {@link javax.jms.ConnectionFactory} instance from the JMS {@link Context}. + *
  6. producerTopic: JMS uses a {@link javax.jms.Topic} instance to for sending and receiving messages. The topic to + * use for sending events to JMS from an Apex producer is held in the JMS {@link Context} object. This parameter + * specifies the label to use to look up the {@link javax.jms.Topic} instance in the JMS {@link Context} for the JMS + * server. The topic must, of course, also be defined on the JMS server. The parameter defaults to the string value + * {@code apex-out}. + *
  7. consumerTopic: The topic to use for receiving events from JMS in an Apex consumer is held in the JMS + * {@link Context} object. This parameter specifies the label to use to look up the {@link javax.jms.Topic} instance in + * the JMS {@link Context} for the JMS server. The topic must, of course, also be defined on the JMS server. The + * parameter defaults to the string value {@code apex-in}. + *
  8. consumerWaitTime: The amount of milliseconds a JMS consumer should wait between checks of its thread execution + * status. The parameter defaults to the long value {@code 100}. + *
  9. objectMessageSending: A flag that indicates whether Apex producers should send JMS messages as + * {@link javax.jms.ObjectMessage} instances for java objects (value {@code true}) or as {@link javax.jms.TextMessage} + * instances for strings (value {@code false}) . The parameter defaults to the boolean value {@code true}. + *
+ * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class JmsCarrierTechnologyParameters extends CarrierTechnologyParameters { + /** The label of this carrier technology. */ + public static final String JMS_CARRIER_TECHNOLOGY_LABEL = "JMS"; + + /** The producer plugin class for the JMS carrier technology. */ + public static final String JMS_EVENT_PRODUCER_PLUGIN_CLASS = ApexJmsProducer.class.getCanonicalName(); + + /** The consumer plugin class for the JMS carrier technology. */ + public static final String JMS_EVENT_CONSUMER_PLUGIN_CLASS = ApexJmsConsumer.class.getCanonicalName(); + + // @formatter:off + + // Default parameter values + private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory"; + private static final String DEFAULT_INITIAL_CTXT_FACTORY = "org.jboss.naming.remote.client.InitialContextFactory"; + private static final String DEFAULT_PROVIDER_URL = "remote://localhost:4447"; + private static final String DEFAULT_SECURITY_PRINCIPAL = "userid"; + private static final String DEFAULT_SECURITY_CREDENTIALS = "password"; + private static final String DEFAULT_CONSUMER_TOPIC = "apex-in"; + private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; + private static final int DEFAULT_CONSUMER_WAIT_TIME = 100; + private static final boolean DEFAULT_TO_OBJECT_MSG_SENDING = true; + + // Parameter property map tokens + private static final String PROPERTY_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY; + private static final String PROPERTY_PROVIDER_URL = Context.PROVIDER_URL; + private static final String PROPERTY_SECURITY_PRINCIPAL = Context.SECURITY_PRINCIPAL; + private static final String PROPERTY_SECURITY_CREDENTIALS = Context.SECURITY_CREDENTIALS; + + // JMS carrier parameters + private String connectionFactory = DEFAULT_CONNECTION_FACTORY; + private String initialContextFactory = DEFAULT_INITIAL_CTXT_FACTORY; + private String providerUrl = DEFAULT_PROVIDER_URL; + private String securityPrincipal = DEFAULT_SECURITY_PRINCIPAL; + private String securityCredentials = DEFAULT_SECURITY_CREDENTIALS; + private String producerTopic = DEFAULT_PRODUCER_TOPIC; + private String consumerTopic = DEFAULT_CONSUMER_TOPIC; + private int consumerWaitTime = DEFAULT_CONSUMER_WAIT_TIME; + private boolean objectMessageSending = DEFAULT_TO_OBJECT_MSG_SENDING; + // @formatter:on + + /** + * Constructor to create a jms carrier technology parameters instance and register the instance with the parameter + * service. + */ + public JmsCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the JMS carrier technology + this.setLabel(JMS_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(JMS_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(JMS_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the JMS producer properties. + * + * @return the JMS producer properties + */ + public Properties getJmsProducerProperties() { + return getJmsProperties(); + } + + /** + * Gets the jms consumer properties. + * + * @return the jms consumer properties + */ + public Properties getJmsConsumerProperties() { + return getJmsProperties(); + } + + /** + * Gets the JMS consumer properties. + * + * @return the jms consumer properties + */ + private Properties getJmsProperties() { + final Properties jmsProperties = new Properties(); + + jmsProperties.put(PROPERTY_INITIAL_CONTEXT_FACTORY, initialContextFactory); + jmsProperties.put(PROPERTY_PROVIDER_URL, providerUrl); + jmsProperties.put(PROPERTY_SECURITY_PRINCIPAL, securityPrincipal); + jmsProperties.put(PROPERTY_SECURITY_CREDENTIALS, securityCredentials); + + return jmsProperties; + } + + /** + * Gets the connection factory. + * + * @return the connection factory + */ + public String getConnectionFactory() { + return connectionFactory; + } + + /** + * Gets the initial context factory. + * + * @return the initial context factory + */ + public String getInitialContextFactory() { + return initialContextFactory; + } + + /** + * Gets the provider URL. + * + * @return the provider URL + */ + public String getProviderUrl() { + return providerUrl; + } + + /** + * Gets the security principal. + * + * @return the security principal + */ + public String getSecurityPrincipal() { + return securityPrincipal; + } + + /** + * Gets the security credentials. + * + * @return the security credentials + */ + public String getSecurityCredentials() { + return securityCredentials; + } + + /** + * Gets the producer topic. + * + * @return the producer topic + */ + public String getProducerTopic() { + return producerTopic; + } + + /** + * Gets the consumer topic. + * + * @return the consumer topic + */ + public String getConsumerTopic() { + return consumerTopic; + } + + /** + * Gets the consumer wait time. + * + * @return the consumer wait time + */ + public long getConsumerWaitTime() { + return consumerWaitTime; + } + + /** + * Sets the connection factory. + * + * @param connectionFactory the connection factory + */ + public void setConnectionFactory(final String connectionFactory) { + this.connectionFactory = connectionFactory; + } + + /** + * Sets the initial context factory. + * + * @param initialContextFactory the initial context factory + */ + public void setInitialContextFactory(final String initialContextFactory) { + this.initialContextFactory = initialContextFactory; + } + + /** + * Sets the provider URL. + * + * @param providerUrl the provider URL + */ + public void setProviderUrl(final String providerUrl) { + this.providerUrl = providerUrl; + } + + /** + * Sets the security principal. + * + * @param securityPrincipal the security principal + */ + public void setSecurityPrincipal(final String securityPrincipal) { + this.securityPrincipal = securityPrincipal; + } + + /** + * Sets the security credentials. + * + * @param securityCredentials the security credentials + */ + public void setSecurityCredentials(final String securityCredentials) { + this.securityCredentials = securityCredentials; + } + + /** + * Sets the producer topic. + * + * @param producerTopic the producer topic + */ + public void setProducerTopic(final String producerTopic) { + this.producerTopic = producerTopic; + } + + /** + * Sets the consumer topic. + * + * @param consumerTopic the consumer topic + */ + public void setConsumerTopic(final String consumerTopic) { + this.consumerTopic = consumerTopic; + } + + /** + * Sets the consumer wait time. + * + * @param consumerWaitTime the consumer wait time + */ + public void setConsumerWaitTime(final int consumerWaitTime) { + this.consumerWaitTime = consumerWaitTime; + } + + /** + * Checks if is object message sending. + * + * @return true, if checks if is object message sending + */ + public boolean isObjectMessageSending() { + return objectMessageSending; + } + + /** + * Sets the object message sending. + * + * @param objectMessageSending the object message sending + */ + public void setObjectMessageSending(final boolean objectMessageSending) { + this.objectMessageSending = objectMessageSending; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public GroupValidationResult validate() { + final GroupValidationResult result = super.validate(); + + if (initialContextFactory == null || initialContextFactory.trim().length() == 0) { + result.setResult("initialContextFactory", ValidationStatus.INVALID, + "initialContextFactory must be specified as a string that is a class that implements the " + + "interface org.jboss.naming.remote.client.InitialContextFactory"); + } + + if (providerUrl == null || providerUrl.trim().length() == 0) { + result.setResult("providerUrl", ValidationStatus.INVALID, + "providerUrl must be specified as a URL string that specifies the location of " + + "configuration information for the service provider to use " + + "such as remote://localhost:4447"); + } + + if (securityPrincipal == null || securityPrincipal.trim().length() == 0) { + result.setResult("securityPrincipal", ValidationStatus.INVALID, + "securityPrincipal must be specified the identity of the principal for authenticating " + + "the caller to the service"); + } + + if (securityCredentials == null || securityCredentials.trim().length() == 0) { + result.setResult("securityCredentials", ValidationStatus.INVALID, + " securityCredentials must be specified as the credentials of the " + + "principal for authenticating the caller to the service"); + } + + if (producerTopic == null || producerTopic.trim().length() == 0) { + result.setResult("producerTopic", ValidationStatus.INVALID, + " producerTopic must be a string that identifies the JMS topic " + + "on which Apex will send events"); + } + + if (consumerTopic == null || consumerTopic.trim().length() == 0) { + result.setResult("consumerTopic", ValidationStatus.INVALID, + " consumerTopic must be a string that identifies the JMS topic " + + "on which Apex will recieve events"); + } + + if (consumerWaitTime < 0) { + result.setResult("consumerWaitTime", ValidationStatus.INVALID, + "[" + consumerWaitTime + "] invalid, must be specified as consumerWaitTime >= 0"); + } + + return result; + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 3351a58e9..dfb12617c 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -47,7 +47,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class); // The Kafka parameters read from the parameter service - private KAFKACarrierTechnologyParameters kafkaConsumerProperties; + private KafkaCarrierTechnologyParameters kafkaConsumerProperties; // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; @@ -79,7 +79,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { this.name = consumerName; // Check and get the Kafka Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { LOGGER.warn("specified consumer properties of type \"" + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + "\" are not applicable to a Kafka consumer"); @@ -88,10 +88,10 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { + "\" are not applicable to a Kafka consumer"); } kafkaConsumerProperties = - (KAFKACarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " @@ -153,7 +153,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index fb851bc70..c83c0ae1e 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -46,7 +46,7 @@ public class ApexKafkaProducer implements ApexEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); // The Kafka parameters read from the parameter service - private KAFKACarrierTechnologyParameters kafkaProducerProperties; + private KafkaCarrierTechnologyParameters kafkaProducerProperties; // The Kafka Producer used to send events using Kafka private Producer kafkaProducer; @@ -63,13 +63,14 @@ public class ApexKafkaProducer implements ApexEventProducer { this.name = producerName; // Check and get the Kafka Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { - LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { + String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"; + LOGGER.warn(message); throw new ApexEventException( - "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + message); } kafkaProducerProperties = - (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); } /* @@ -122,7 +123,7 @@ public class ApexKafkaProducer implements ApexEventProducer { // Kafka producer must be started in the same thread as it is stopped, so we must start it here if (kafkaProducer == null) { // Kick off the Kafka producer - kafkaProducer = new KafkaProducer(kafkaProducerProperties.getKafkaProducerProperties()); + kafkaProducer = new KafkaProducer<>(kafkaProducerProperties.getKafkaProducerProperties()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event producer " + this.name + " is ready to send to topics: " + kafkaProducerProperties.getProducerTopic()); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java deleted file mode 100644 index 5ce96662e..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java +++ /dev/null @@ -1,419 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.kafka; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Properties; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.GroupValidationResult; -import org.onap.policy.common.parameters.ValidationStatus; - -/** - * Apex parameters for Kafka as an event carrier technology. - * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters { - // @formatter:off - /** The label of this carrier technology. */ - public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA"; - - /** The producer plugin class for the Kafka carrier technology. */ - public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName(); - - /** The consumer plugin class for the Kafka carrier technology. */ - public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName(); - - // Repeated strings in messages - private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; - - // Default parameter values - private static final String DEFAULT_ACKS = "all"; - private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; - private static final int DEFAULT_RETRIES = 0; - private static final int DEFAULT_BATCH_SIZE = 16384; - private static final int DEFAULT_LINGER_TIME = 1; - private static final long DEFAULT_BUFFER_MEMORY = 33554432; - private static final String DEFAULT_GROUP_ID = "default-group-id"; - private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true; - private static final int DEFAULT_AUTO_COMMIT_TIME = 1000; - private static final int DEFAULT_SESSION_TIMEOUT = 30000; - private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; - private static final int DEFAULT_CONSUMER_POLL_TIME = 100; - private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"}; - private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - - // Parameter property map tokens - private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; - private static final String PROPERTY_ACKS = "acks"; - private static final String PROPERTY_RETRIES = "retries"; - private static final String PROPERTY_BATCH_SIZE = "batch.size"; - private static final String PROPERTY_LINGER_TIME = "linger.ms"; - private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory"; - private static final String PROPERTY_GROUP_ID = "group.id"; - private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit"; - private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms"; - private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms"; - private static final String PROPERTY_KEY_SERIALIZER = "key.serializer"; - private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer"; - private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer"; - private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; - - // kafka carrier parameters - private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; - private String acks = DEFAULT_ACKS; - private int retries = DEFAULT_RETRIES; - private int batchSize = DEFAULT_BATCH_SIZE; - private int lingerTime = DEFAULT_LINGER_TIME; - private long bufferMemory = DEFAULT_BUFFER_MEMORY; - private String groupId = DEFAULT_GROUP_ID; - private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT; - private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME; - private int sessionTimeout = DEFAULT_SESSION_TIMEOUT; - private String producerTopic = DEFAULT_PRODUCER_TOPIC; - private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME; - private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST; - private String keySerializer = DEFAULT_KEY_SERIALIZER; - private String valueSerializer = DEFAULT_VALUE_SERIALIZER; - private String keyDeserializer = DEFAULT_KEY_DESERIALIZER; - private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER; - // @formatter:on - - /** - * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter - * service. - */ - public KAFKACarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the kafka carrier technology - this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS); - } - - /** - * Gets the kafka producer properties. - * - * @return the kafka producer properties - */ - public Properties getKafkaProducerProperties() { - final Properties kafkaProperties = new Properties(); - - kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - kafkaProperties.put(PROPERTY_ACKS, acks); - kafkaProperties.put(PROPERTY_RETRIES, retries); - kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); - kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); - kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); - kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); - kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); - - return kafkaProperties; - } - - /** - * Gets the kafka consumer properties. - * - * @return the kafka consumer properties - */ - public Properties getKafkaConsumerProperties() { - final Properties kafkaProperties = new Properties(); - - kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - kafkaProperties.put(PROPERTY_GROUP_ID, groupId); - kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); - kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); - kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); - kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); - kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); - - return kafkaProperties; - } - - /** - * Gets the bootstrap servers. - * - * @return the bootstrap servers - */ - public String getBootstrapServers() { - return bootstrapServers; - } - - /** - * Gets the acks. - * - * @return the acks - */ - public String getAcks() { - return acks; - } - - /** - * Gets the retries. - * - * @return the retries - */ - public int getRetries() { - return retries; - } - - /** - * Gets the batch size. - * - * @return the batch size - */ - public int getBatchSize() { - return batchSize; - } - - /** - * Gets the linger time. - * - * @return the linger time - */ - public int getLingerTime() { - return lingerTime; - } - - /** - * Gets the buffer memory. - * - * @return the buffer memory - */ - public long getBufferMemory() { - return bufferMemory; - } - - /** - * Gets the group id. - * - * @return the group id - */ - public String getGroupId() { - return groupId; - } - - /** - * Checks if is enable auto commit. - * - * @return true, if checks if is enable auto commit - */ - public boolean isEnableAutoCommit() { - return enableAutoCommit; - } - - /** - * Gets the auto commit time. - * - * @return the auto commit time - */ - public int getAutoCommitTime() { - return autoCommitTime; - } - - /** - * Gets the session timeout. - * - * @return the session timeout - */ - public int getSessionTimeout() { - return sessionTimeout; - } - - /** - * Gets the producer topic. - * - * @return the producer topic - */ - public String getProducerTopic() { - return producerTopic; - } - - /** - * Gets the consumer poll time. - * - * @return the consumer poll time - */ - public long getConsumerPollTime() { - return consumerPollTime; - } - - /** - * Gets the consumer topic list. - * - * @return the consumer topic list - */ - public Collection getConsumerTopicList() { - return Arrays.asList(consumerTopicList); - } - - /** - * Gets the key serializer. - * - * @return the key serializer - */ - public String getKeySerializer() { - return keySerializer; - } - - /** - * Gets the value serializer. - * - * @return the value serializer - */ - public String getValueSerializer() { - return valueSerializer; - } - - /** - * Gets the key deserializer. - * - * @return the key deserializer - */ - public String getKeyDeserializer() { - return keyDeserializer; - } - - /** - * Gets the value deserializer. - * - * @return the value deserializer - */ - public String getValueDeserializer() { - return valueDeserializer; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() - */ - @Override - public GroupValidationResult validate() { - final GroupValidationResult result = super.validate(); - - if (isNullOrBlank(bootstrapServers)) { - result.setResult("bootstrapServers", ValidationStatus.INVALID, - "not specified, must be specified as a string of form host:port"); - } - - if (isNullOrBlank(acks)) { - result.setResult("acks", ValidationStatus.INVALID, - "not specified, must be specified as a string with values [0|1|all]"); - } - - if (retries < 0) { - result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, - "[" + retries + "] invalid, must be specified as retries >= 0"); - } - - if (batchSize < 0) { - result.setResult("batchSize", ValidationStatus.INVALID, - "[" + batchSize + "] invalid, must be specified as batchSize >= 0"); - } - - if (lingerTime < 0) { - result.setResult("lingerTime", ValidationStatus.INVALID, - "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0"); - } - - if (bufferMemory < 0) { - result.setResult("bufferMemory", ValidationStatus.INVALID, - "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); - } - - if (isNullOrBlank(groupId)) { - result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); - } - - if (autoCommitTime < 0) { - result.setResult("autoCommitTime", ValidationStatus.INVALID, - "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); - } - - if (sessionTimeout < 0) { - result.setResult("sessionTimeout", ValidationStatus.INVALID, - "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); - } - - if (isNullOrBlank(producerTopic)) { - result.setResult("producerTopic", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (consumerPollTime < 0) { - result.setResult("consumerPollTime", ValidationStatus.INVALID, - "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); - } - - validateConsumerTopicList(result); - - if (isNullOrBlank(keySerializer)) { - result.setResult("keySerializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(valueSerializer)) { - result.setResult("valueSerializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(keyDeserializer)) { - result.setResult("keyDeserializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(valueDeserializer)) { - result.setResult("valueDeserializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - return result; - } - - private void validateConsumerTopicList(final GroupValidationResult result) { - if (consumerTopicList == null || consumerTopicList.length == 0) { - result.setResult("consumerTopicList", ValidationStatus.INVALID, - "not specified, must be specified as a list of strings"); - } - - StringBuilder consumerTopicStringBuilder = new StringBuilder(); - for (final String consumerTopic : consumerTopicList) { - if (consumerTopic == null || consumerTopic.trim().length() == 0) { - consumerTopicStringBuilder.append(consumerTopic + "/"); - } - } - if (consumerTopicStringBuilder.length() > 0) { - result.setResult("consumerTopicList", ValidationStatus.INVALID, - "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString()); - } - } - - private boolean isNullOrBlank(final String stringValue) { - return stringValue == null || stringValue.trim().length() == 0; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java new file mode 100644 index 000000000..9d7cc77f3 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -0,0 +1,419 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationStatus; + +/** + * Apex parameters for Kafka as an event carrier technology. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA"; + + /** The producer plugin class for the Kafka carrier technology. */ + public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName(); + + /** The consumer plugin class for the Kafka carrier technology. */ + public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName(); + + // Repeated strings in messages + private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; + + // Default parameter values + private static final String DEFAULT_ACKS = "all"; + private static final String DEFAULT_BOOT_SERVERS = "localhost:9092"; + private static final int DEFAULT_RETRIES = 0; + private static final int DEFAULT_BATCH_SIZE = 16384; + private static final int DEFAULT_LINGER_TIME = 1; + private static final long DEFAULT_BUFFER_MEMORY = 33554432; + private static final String DEFAULT_GROUP_ID = "default-group-id"; + private static final boolean DEFAULT_ENABLE_AUTOCMIT = true; + private static final int DEFAULT_AUTO_COMMIT_TIME = 1000; + private static final int DEFAULT_SESSION_TIMEOUT = 30000; + private static final String DEFAULT_PROD_TOPIC = "apex-out"; + private static final int DEFAULT_CONS_POLL_TIME = 100; + private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; + private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + + // Parameter property map tokens + private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; + private static final String PROPERTY_ACKS = "acks"; + private static final String PROPERTY_RETRIES = "retries"; + private static final String PROPERTY_BATCH_SIZE = "batch.size"; + private static final String PROPERTY_LINGER_TIME = "linger.ms"; + private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory"; + private static final String PROPERTY_GROUP_ID = "group.id"; + private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit"; + private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms"; + private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms"; + private static final String PROPERTY_KEY_SERIALIZER = "key.serializer"; + private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer"; + private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer"; + private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; + + // kafka carrier parameters + private String bootstrapServers = DEFAULT_BOOT_SERVERS; + private String acks = DEFAULT_ACKS; + private int retries = DEFAULT_RETRIES; + private int batchSize = DEFAULT_BATCH_SIZE; + private int lingerTime = DEFAULT_LINGER_TIME; + private long bufferMemory = DEFAULT_BUFFER_MEMORY; + private String groupId = DEFAULT_GROUP_ID; + private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT; + private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME; + private int sessionTimeout = DEFAULT_SESSION_TIMEOUT; + private String producerTopic = DEFAULT_PROD_TOPIC; + private int consumerPollTime = DEFAULT_CONS_POLL_TIME; + private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; + private String keySerializer = DEFAULT_KEY_SERZER; + private String valueSerializer = DEFAULT_VAL_SERZER; + private String keyDeserializer = DEFAULT_KEY_DESZER; + private String valueDeserializer = DEFAULT_VALUE_DESZER; + // @formatter:on + + /** + * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter + * service. + */ + public KafkaCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the kafka carrier technology + this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the kafka producer properties. + * + * @return the kafka producer properties + */ + public Properties getKafkaProducerProperties() { + final Properties kafkaProperties = new Properties(); + + kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + kafkaProperties.put(PROPERTY_ACKS, acks); + kafkaProperties.put(PROPERTY_RETRIES, retries); + kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); + kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); + kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); + kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); + kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); + + return kafkaProperties; + } + + /** + * Gets the kafka consumer properties. + * + * @return the kafka consumer properties + */ + public Properties getKafkaConsumerProperties() { + final Properties kafkaProperties = new Properties(); + + kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + kafkaProperties.put(PROPERTY_GROUP_ID, groupId); + kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); + kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); + kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); + kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); + kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); + + return kafkaProperties; + } + + /** + * Gets the bootstrap servers. + * + * @return the bootstrap servers + */ + public String getBootstrapServers() { + return bootstrapServers; + } + + /** + * Gets the acks. + * + * @return the acks + */ + public String getAcks() { + return acks; + } + + /** + * Gets the retries. + * + * @return the retries + */ + public int getRetries() { + return retries; + } + + /** + * Gets the batch size. + * + * @return the batch size + */ + public int getBatchSize() { + return batchSize; + } + + /** + * Gets the linger time. + * + * @return the linger time + */ + public int getLingerTime() { + return lingerTime; + } + + /** + * Gets the buffer memory. + * + * @return the buffer memory + */ + public long getBufferMemory() { + return bufferMemory; + } + + /** + * Gets the group id. + * + * @return the group id + */ + public String getGroupId() { + return groupId; + } + + /** + * Checks if is enable auto commit. + * + * @return true, if checks if is enable auto commit + */ + public boolean isEnableAutoCommit() { + return enableAutoCommit; + } + + /** + * Gets the auto commit time. + * + * @return the auto commit time + */ + public int getAutoCommitTime() { + return autoCommitTime; + } + + /** + * Gets the session timeout. + * + * @return the session timeout + */ + public int getSessionTimeout() { + return sessionTimeout; + } + + /** + * Gets the producer topic. + * + * @return the producer topic + */ + public String getProducerTopic() { + return producerTopic; + } + + /** + * Gets the consumer poll time. + * + * @return the consumer poll time + */ + public long getConsumerPollTime() { + return consumerPollTime; + } + + /** + * Gets the consumer topic list. + * + * @return the consumer topic list + */ + public Collection getConsumerTopicList() { + return Arrays.asList(consumerTopicList); + } + + /** + * Gets the key serializer. + * + * @return the key serializer + */ + public String getKeySerializer() { + return keySerializer; + } + + /** + * Gets the value serializer. + * + * @return the value serializer + */ + public String getValueSerializer() { + return valueSerializer; + } + + /** + * Gets the key deserializer. + * + * @return the key deserializer + */ + public String getKeyDeserializer() { + return keyDeserializer; + } + + /** + * Gets the value deserializer. + * + * @return the value deserializer + */ + public String getValueDeserializer() { + return valueDeserializer; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public GroupValidationResult validate() { + final GroupValidationResult result = super.validate(); + + if (isNullOrBlank(bootstrapServers)) { + result.setResult("bootstrapServers", ValidationStatus.INVALID, + "not specified, must be specified as a string of form host:port"); + } + + if (isNullOrBlank(acks)) { + result.setResult("acks", ValidationStatus.INVALID, + "not specified, must be specified as a string with values [0|1|all]"); + } + + if (retries < 0) { + result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, + "[" + retries + "] invalid, must be specified as retries >= 0"); + } + + if (batchSize < 0) { + result.setResult("batchSize", ValidationStatus.INVALID, + "[" + batchSize + "] invalid, must be specified as batchSize >= 0"); + } + + if (lingerTime < 0) { + result.setResult("lingerTime", ValidationStatus.INVALID, + "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0"); + } + + if (bufferMemory < 0) { + result.setResult("bufferMemory", ValidationStatus.INVALID, + "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); + } + + if (isNullOrBlank(groupId)) { + result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); + } + + if (autoCommitTime < 0) { + result.setResult("autoCommitTime", ValidationStatus.INVALID, + "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); + } + + if (sessionTimeout < 0) { + result.setResult("sessionTimeout", ValidationStatus.INVALID, + "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); + } + + if (isNullOrBlank(producerTopic)) { + result.setResult("producerTopic", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + + if (consumerPollTime < 0) { + result.setResult("consumerPollTime", ValidationStatus.INVALID, + "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); + } + + validateConsumerTopicList(result); + + if (isNullOrBlank(keySerializer)) { + result.setResult("keySerializer", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(valueSerializer)) { + result.setResult("valueSerializer", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(keyDeserializer)) { + result.setResult("keyDeserializer", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(valueDeserializer)) { + result.setResult("valueDeserializer", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + + return result; + } + + private void validateConsumerTopicList(final GroupValidationResult result) { + if (consumerTopicList == null || consumerTopicList.length == 0) { + result.setResult("consumerTopicList", ValidationStatus.INVALID, + "not specified, must be specified as a list of strings"); + } + + StringBuilder consumerTopicStringBuilder = new StringBuilder(); + for (final String consumerTopic : consumerTopicList) { + if (consumerTopic == null || consumerTopic.trim().length() == 0) { + consumerTopicStringBuilder.append(consumerTopic + "/"); + } + } + if (consumerTopicStringBuilder.length() > 0) { + result.setResult("consumerTopicList", ValidationStatus.INVALID, + "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString()); + } + } + + private boolean isNullOrBlank(final String stringValue) { + return stringValue == null || stringValue.trim().length() == 0; + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java index af5d4d861..13edea8a6 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientConsumer.java @@ -52,7 +52,7 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { private static final long REST_CLIENT_WAIT_SLEEP_TIME = 50; // The REST parameters read from the parameter service - private RESTClientCarrierTechnologyParameters restConsumerProperties; + private RestClientCarrierTechnologyParameters restConsumerProperties; // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; @@ -77,22 +77,22 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { this.name = consumerName; // Check and get the REST Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RESTClientCarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestClientCarrierTechnologyParameters)) { final String errorMessage = "specified consumer properties are not applicable to REST client consumer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restConsumerProperties = (RESTClientCarrierTechnologyParameters) consumerParameters + restConsumerProperties = (RestClientCarrierTechnologyParameters) consumerParameters .getCarrierTechnologyParameters(); // Check if the HTTP method has been set if (restConsumerProperties.getHttpMethod() == null) { - restConsumerProperties.setHttpMethod(RESTClientCarrierTechnologyParameters.CONSUMER_HTTP_METHOD); + restConsumerProperties.setHttpMethod(RestClientCarrierTechnologyParameters.CONSUMER_HTTP_METHOD); } if (!restConsumerProperties.getHttpMethod() - .equalsIgnoreCase(RESTClientCarrierTechnologyParameters.CONSUMER_HTTP_METHOD)) { + .equalsIgnoreCase(RestClientCarrierTechnologyParameters.CONSUMER_HTTP_METHOD)) { final String errorMessage = "specified HTTP method of \"" + restConsumerProperties.getHttpMethod() + "\" is invalid, only HTTP method \"GET\" " + "is supported for event reception on REST client consumer (" + this.name + ")"; @@ -214,17 +214,17 @@ public class ApexRestClientConsumer implements ApexEventConsumer, Runnable { } // Get the event we received - final String eventJSONString = response.readEntity(String.class); + final String eventJsonString = response.readEntity(String.class); // Check there is content - if (eventJSONString == null || eventJSONString.trim().length() == 0) { + if (eventJsonString == null || eventJsonString.trim().length() == 0) { final String errorMessage = "received an empty event from URL \"" + restConsumerProperties.getUrl() + "\""; throw new ApexEventRuntimeException(errorMessage); } // Send the event into Apex - eventReceiver.receiveEvent(eventJSONString); + eventReceiver.receiveEvent(eventJsonString); } catch (final Exception e) { LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java index 2765fe9e7..82c3cf331 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/ApexRestClientProducer.java @@ -51,7 +51,7 @@ public class ApexRestClientProducer implements ApexEventProducer { private Client client; // The REST carrier properties - private RESTClientCarrierTechnologyParameters restProducerProperties; + private RestClientCarrierTechnologyParameters restProducerProperties; // The name for this producer private String name = null; @@ -67,29 +67,29 @@ public class ApexRestClientProducer implements ApexEventProducer { */ @Override public void init(final String producerName, final EventHandlerParameters producerParameters) - throws ApexEventException { + throws ApexEventException { this.name = producerName; // Check and get the REST Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof RESTClientCarrierTechnologyParameters)) { - final String errorMessage = - "specified consumer properties are not applicable to REST client producer (" + this.name + ")"; + if (!(producerParameters.getCarrierTechnologyParameters() instanceof RestClientCarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties are not applicable to REST client producer (" + + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restProducerProperties = - (RESTClientCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + restProducerProperties = (RestClientCarrierTechnologyParameters) producerParameters + .getCarrierTechnologyParameters(); // Check if the HTTP method has been set if (restProducerProperties.getHttpMethod() == null) { - restProducerProperties.setHttpMethod(RESTClientCarrierTechnologyParameters.DEFAULT_PRODUCER_HTTP_METHOD); + restProducerProperties.setHttpMethod(RestClientCarrierTechnologyParameters.DEFAULT_PRODUCER_HTTP_METHOD); } if (!restProducerProperties.getHttpMethod().equalsIgnoreCase("POST") - && !restProducerProperties.getHttpMethod().equalsIgnoreCase("PUT")) { + && !restProducerProperties.getHttpMethod().equalsIgnoreCase("PUT")) { final String errorMessage = "specified HTTP method of \"" + restProducerProperties.getHttpMethod() - + "\" is invalid, only HTTP methods \"POST\" and \"PUT\" are supproted for event sending on REST client producer (" - + this.name + ")"; + + "\" is invalid, only HTTP methods \"POST\" and \"PUT\" are supproted " + + "for event sending on REST client producer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } @@ -139,8 +139,8 @@ public class ApexRestClientProducer implements ApexEventProducer { @Override public void sendEvent(final long executionId, final String eventName, final Object event) { // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap + .get(EventHandlerPeeredMode.SYNCHRONOUS); if (synchronousEventCache != null) { synchronousEventCache.removeCachedEventToApexIfExists(executionId); } @@ -151,15 +151,16 @@ public class ApexRestClientProducer implements ApexEventProducer { // Check that the request worked if (response.getStatus() != Response.Status.OK.getStatusCode()) { final String errorMessage = "send of event to URL \"" + restProducerProperties.getUrl() + "\" using HTTP \"" - + restProducerProperties.getHttpMethod() + "\" failed with status code " + response.getStatus() - + " and message \"" + response.readEntity(String.class) + "\", event:\n" + event; + + restProducerProperties.getHttpMethod() + "\" failed with status code " + + response.getStatus() + " and message \"" + response.readEntity(String.class) + + "\", event:\n" + event; LOGGER.warn(errorMessage); throw new ApexEventRuntimeException(errorMessage); } if (LOGGER.isTraceEnabled()) { LOGGER.trace("event sent from engine using {} to URL {} with HTTP {} : {} and response {} ", this.name, - restProducerProperties.getUrl(), restProducerProperties.getHttpMethod(), event, response); + restProducerProperties.getUrl(), restProducerProperties.getHttpMethod(), event, response); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RESTClientCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RESTClientCarrierTechnologyParameters.java deleted file mode 100644 index d260cbeca..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RESTClientCarrierTechnologyParameters.java +++ /dev/null @@ -1,135 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.restclient; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.GroupValidationResult; -import org.onap.policy.common.parameters.ValidationStatus; - -/** - * Apex parameters for REST as an event carrier technology with Apex as a REST client. - * - *

The parameters for this plugin are: - *

    - *
  1. url: The URL that the Apex Rest client will connect to over REST for event reception or event sending. This - * parameter is mandatory. - *
  2. httpMethod: The HTTP method to use when sending events over REST, legal values are POST (default) and PUT. When - * receiving events, the REST client plugin always uses the HTTP GET method. - *
- * - * @author Joss Armstrong (joss.armstrong@ericsson.com) - */ -public class RESTClientCarrierTechnologyParameters extends CarrierTechnologyParameters { - - /** The label of this carrier technology. */ - public static final String RESTCLIENT_CARRIER_TECHNOLOGY_LABEL = "RESTCLIENT"; - - /** The producer plugin class for the REST carrier technology. */ - public static final String RESTCLIENT_EVENT_PRODUCER_PLUGIN_CLASS = ApexRestClientProducer.class.getCanonicalName(); - - /** The consumer plugin class for the REST carrier technology. */ - public static final String RESTCLIENT_EVENT_CONSUMER_PLUGIN_CLASS = ApexRestClientConsumer.class.getCanonicalName(); - - /** The default HTTP method for output of events. */ - public static final String DEFAULT_PRODUCER_HTTP_METHOD = "POST"; - - /** The HTTP method for input of events. */ - public static final String CONSUMER_HTTP_METHOD = "GET"; - - private String url = null; - private String httpMethod = null; - - /** - * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter - * service. - */ - public RESTClientCarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the web socket carrier technology - this.setLabel(RESTCLIENT_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(RESTCLIENT_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(RESTCLIENT_EVENT_CONSUMER_PLUGIN_CLASS); - - } - - /** - * Gets the URL for the REST request. - * - * @return the URL - */ - public String getUrl() { - return url; - } - - /** - * Sets the URL for the REST request. - * - * @param incomingURL the URL - */ - public void setURL(final String incomingURL) { - this.url = incomingURL; - } - - /** - * Gets the HTTP method to use for the REST request. - * - * @return the HTTP method - */ - public String getHttpMethod() { - return httpMethod; - } - - /** - * Sets the HTTP method to use for the REST request. - * - * @param httpMethod the HTTP method - */ - public void setHttpMethod(final String httpMethod) { - this.httpMethod = httpMethod; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "RESTClientCarrierTechnologyParameters [url=" + url + ", httpMethod=" + httpMethod + "]"; - } - - /* - * - * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() - */ - @Override - public GroupValidationResult validate() { - final GroupValidationResult result = super.validate(); - - // Check if the URL has been set for event output - if (getUrl() == null) { - result.setResult("url", ValidationStatus.INVALID, "no URL has been set for event sending on REST client"); - } - - return result; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RestClientCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RestClientCarrierTechnologyParameters.java new file mode 100644 index 000000000..86c8bb4cf --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restclient/src/main/java/org/onap/policy/apex/plugins/event/carrier/restclient/RestClientCarrierTechnologyParameters.java @@ -0,0 +1,135 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.restclient; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationStatus; + +/** + * Apex parameters for REST as an event carrier technology with Apex as a REST client. + * + *

The parameters for this plugin are: + *

    + *
  1. url: The URL that the Apex Rest client will connect to over REST for event reception or event sending. This + * parameter is mandatory. + *
  2. httpMethod: The HTTP method to use when sending events over REST, legal values are POST (default) and PUT. When + * receiving events, the REST client plugin always uses the HTTP GET method. + *
+ * + * @author Joss Armstrong (joss.armstrong@ericsson.com) + */ +public class RestClientCarrierTechnologyParameters extends CarrierTechnologyParameters { + + /** The label of this carrier technology. */ + public static final String RESTCLIENT_CARRIER_TECHNOLOGY_LABEL = "RESTCLIENT"; + + /** The producer plugin class for the REST carrier technology. */ + public static final String RESTCLIENT_EVENT_PRODUCER_PLUGIN_CLASS = ApexRestClientProducer.class.getCanonicalName(); + + /** The consumer plugin class for the REST carrier technology. */ + public static final String RESTCLIENT_EVENT_CONSUMER_PLUGIN_CLASS = ApexRestClientConsumer.class.getCanonicalName(); + + /** The default HTTP method for output of events. */ + public static final String DEFAULT_PRODUCER_HTTP_METHOD = "POST"; + + /** The HTTP method for input of events. */ + public static final String CONSUMER_HTTP_METHOD = "GET"; + + private String url = null; + private String httpMethod = null; + + /** + * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter + * service. + */ + public RestClientCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the web socket carrier technology + this.setLabel(RESTCLIENT_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(RESTCLIENT_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(RESTCLIENT_EVENT_CONSUMER_PLUGIN_CLASS); + + } + + /** + * Gets the URL for the REST request. + * + * @return the URL + */ + public String getUrl() { + return url; + } + + /** + * Sets the URL for the REST request. + * + * @param incomingUrl the URL + */ + public void setUrl(final String incomingUrl) { + this.url = incomingUrl; + } + + /** + * Gets the HTTP method to use for the REST request. + * + * @return the HTTP method + */ + public String getHttpMethod() { + return httpMethod; + } + + /** + * Sets the HTTP method to use for the REST request. + * + * @param httpMethod the HTTP method + */ + public void setHttpMethod(final String httpMethod) { + this.httpMethod = httpMethod; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "RESTClientCarrierTechnologyParameters [url=" + url + ", httpMethod=" + httpMethod + "]"; + } + + /* + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public GroupValidationResult validate() { + final GroupValidationResult result = super.validate(); + + // Check if the URL has been set for event output + if (getUrl() == null) { + result.setResult("url", ValidationStatus.INVALID, "no URL has been set for event sending on REST client"); + } + + return result; + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequest.java index 12b9a695c..4b16d30d4 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequest.java @@ -31,32 +31,67 @@ public class ApexRestRequest { private Object event; private long timestamp; + /** + * Instantiates a new apex rest request. + * + * @param executionId the execution id + * @param eventName the event name + * @param event the event + */ public ApexRestRequest(final long executionId, final String eventName, final Object event) { this.executionId = executionId; this.eventName = eventName; this.event = event; } + /** + * Gets the execution id. + * + * @return the execution id + */ public long getExecutionId() { return executionId; } + /** + * Gets the event name. + * + * @return the event name + */ public String getEventName() { return eventName; } + /** + * Gets the event. + * + * @return the event + */ public Object getEvent() { return event; } + /** + * Gets the timestamp. + * + * @return the timestamp + */ public long getTimestamp() { return timestamp; } + /** + * Sets the timestamp. + * + * @param timestamp the new timestamp + */ public void setTimestamp(final long timestamp) { this.timestamp = timestamp; } + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ @Override public String toString() { return "ApexRestRequest [executionId=" + executionId + ", eventName=" + eventName + ", event=" + event diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index 9998349db..dea839ebb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -49,8 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class implements an Apex event consumer that issues a REST request and returns the REST - * response to APEX as an event. + * This class implements an Apex event consumer that issues a REST request and returns the REST response to APEX as an + * event. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -63,10 +63,10 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { private static final long REST_REQUESTOR_WAIT_SLEEP_TIME = 50; // The REST parameters read from the parameter service - private RESTRequestorCarrierTechnologyParameters restConsumerProperties; + private RestRequestorCarrierTechnologyParameters restConsumerProperties; // The timeout for REST requests - private long restRequestTimeout = RESTRequestorCarrierTechnologyParameters.DEFAULT_REST_REQUEST_TIMEOUT; + private long restRequestTimeout = RestRequestorCarrierTechnologyParameters.DEFAULT_REST_REQUEST_TIMEOUT; // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; @@ -99,25 +99,25 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the REST Properties if (!(consumerParameters - .getCarrierTechnologyParameters() instanceof RESTRequestorCarrierTechnologyParameters)) { - final String errorMessage = - "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")"; + .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties are not applicable to REST Requestor consumer (" + + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restConsumerProperties = - (RESTRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + restConsumerProperties = (RestRequestorCarrierTechnologyParameters) consumerParameters + .getCarrierTechnologyParameters(); // Check if we are in peered mode if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { final String errorMessage = "REST Requestor consumer (" + this.name - + ") must run in peered requestor mode with a REST Requestor producer"; + + ") must run in peered requestor mode with a REST Requestor producer"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } @@ -125,7 +125,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Check if the HTTP method has been set if (restConsumerProperties.getHttpMethod() == null) { restConsumerProperties - .setHttpMethod(RESTRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); + .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); } // Check if the HTTP URL has been set @@ -164,8 +164,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { try { incomingRestRequestQueue.add(restRequest); } catch (final Exception e) { - final String errorMessage = - "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")"; + final String errorMessage = "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventRuntimeException(errorMessage); } @@ -196,7 +196,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { } /** - * Get the number of events received to date + * Get the number of events received to date. * * @return the number of events received */ @@ -238,8 +238,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { while (consumerThread.isAlive() && !stopOrderedFlag) { try { // Take the next event from the queue - final ApexRestRequest restRequest = - incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); + final ApexRestRequest restRequest = incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, + TimeUnit.MILLISECONDS); if (restRequest == null) { // Poll timed out, check for request timeouts timeoutExpiredRequests(); @@ -268,7 +268,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { } /** - * This method times out REST requests that have expired + * This method times out REST requests that have expired. */ private void timeoutExpiredRequests() { // Hold a list of timed out requests @@ -284,8 +284,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Interrupt timed out requests and remove them from the ongoing map for (final ApexRestRequest timedoutRequest : timedoutRequestList) { - final String errorMessage = - "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest; + final String errorMessage = "REST Requestor consumer (" + this.name + "), REST request timed out: " + + timedoutRequest; LOGGER.warn(errorMessage); ongoingRestRequestMap.remove(timedoutRequest); @@ -321,7 +321,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { private Thread restRequestThread; /** - * Constructor, initialise the request runner with the request + * Constructor, initialise the request runner with the request. * * @param request the request this runner will issue */ @@ -341,28 +341,29 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { try { // Execute the REST request - final Response response = sendEventAsRESTRequest(); + final Response response = sendEventAsRestRequest(); // Check that the event request worked if (response.getStatus() != Response.Status.OK.getStatusCode()) { final String errorMessage = "reception of response to \"" + request + "\" from URL \"" - + restConsumerProperties.getUrl() + "\" failed with status code " + response.getStatus() - + " and message \"" + response.readEntity(String.class) + "\""; + + restConsumerProperties.getUrl() + "\" failed with status code " + + response.getStatus() + " and message \"" + response.readEntity(String.class) + + "\""; throw new ApexEventRuntimeException(errorMessage); } // Get the event we received - final String eventJSONString = response.readEntity(String.class); + final String eventJsonString = response.readEntity(String.class); // Check there is content - if (eventJSONString == null || eventJSONString.trim().length() == 0) { + if (eventJsonString == null || eventJsonString.trim().length() == 0) { final String errorMessage = "received an enpty response to \"" + request + "\" from URL \"" - + restConsumerProperties.getUrl() + "\""; + + restConsumerProperties.getUrl() + "\""; throw new ApexEventRuntimeException(errorMessage); } // Send the event into Apex - eventReceiver.receiveEvent(request.getExecutionId(), eventJSONString); + eventReceiver.receiveEvent(request.getExecutionId(), eventJsonString); synchronized (eventsReceivedLock) { eventsReceived++; @@ -376,7 +377,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { } /** - * Stop the REST request + * Stop the REST request. */ private void stop() { restRequestThread.interrupt(); @@ -387,21 +388,24 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { * * @return the response to the REST request */ - public Response sendEventAsRESTRequest() { + public Response sendEventAsRestRequest() { switch (restConsumerProperties.getHttpMethod()) { case GET: return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON).get(); case PUT: return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) - .put(Entity.json(request.getEvent())); + .put(Entity.json(request.getEvent())); case POST: return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) - .post(Entity.json(request.getEvent())); + .post(Entity.json(request.getEvent())); case DELETE: return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON).delete(); + + default: + break; } return null; diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java index 721dfb683..69ad05b27 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorProducer.java @@ -44,7 +44,7 @@ public class ApexRestRequestorProducer implements ApexEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorProducer.class); // The REST carrier properties - private RESTRequestorCarrierTechnologyParameters restProducerProperties; + private RestRequestorCarrierTechnologyParameters restProducerProperties; // The name for this producer private String name = null; @@ -68,14 +68,14 @@ public class ApexRestRequestorProducer implements ApexEventProducer { // Check and get the REST Properties if (!(producerParameters - .getCarrierTechnologyParameters() instanceof RESTRequestorCarrierTechnologyParameters)) { + .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { final String errorMessage = "specified consumer properties are not applicable to REST requestor producer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } restProducerProperties = - (RESTRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + (RestRequestorCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); // Check if we are in peered mode if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RESTRequestorCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RESTRequestorCarrierTechnologyParameters.java deleted file mode 100644 index 65eb731ed..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RESTRequestorCarrierTechnologyParameters.java +++ /dev/null @@ -1,125 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.restrequestor; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; - -/** - * Apex parameters for REST as an event carrier technology with Apex issuing a REST request and receiving a REST - * response. - * - *

The parameters for this plugin are: - *

    - *
  1. url: The URL that the Apex Rest Requestor will connect to over REST for REST request sending. This parameter is - * mandatory. - *
  2. httpMethod: The HTTP method to use when making requests over REST, legal values are GET (default), POST, PUT, and - * DELETE. - *
  3. restRequestTimeout: The time in milliseconds to wait for a REST request to complete. - *
- * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class RESTRequestorCarrierTechnologyParameters extends CarrierTechnologyParameters { - /** The supported HTTP methods. */ - public enum HTTP_METHOD { - GET, PUT, POST, DELETE - } - - /** The label of this carrier technology. */ - public static final String RESTREQUESTOR_CARRIER_TECHNOLOGY_LABEL = "RESTREQUESTOR"; - - /** The producer plugin class for the REST carrier technology. */ - public static final String RESTREQUSTOR_EVENT_PRODUCER_PLUGIN_CLASS = - ApexRestRequestorProducer.class.getCanonicalName(); - - /** The consumer plugin class for the REST carrier technology. */ - public static final String RESTREQUSTOR_EVENT_CONSUMER_PLUGIN_CLASS = - ApexRestRequestorConsumer.class.getCanonicalName(); - - /** The default HTTP method for request events. */ - public static final HTTP_METHOD DEFAULT_REQUESTOR_HTTP_METHOD = HTTP_METHOD.GET; - - /** The default timeout for REST requests. */ - public static final long DEFAULT_REST_REQUEST_TIMEOUT = 500; - - private String url = null; - private HTTP_METHOD httpMethod = null; - - /** - * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter - * service. - */ - public RESTRequestorCarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the web socket carrier technology - this.setLabel(RESTREQUESTOR_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(RESTREQUSTOR_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(RESTREQUSTOR_EVENT_CONSUMER_PLUGIN_CLASS); - } - - /** - * Gets the URL for the REST request. - * - * @return the URL - */ - public String getUrl() { - return url; - } - - /** - * Sets the URL for the REST request. - * - * @param incomingURL the URL - */ - public void setURL(final String incomingURL) { - this.url = incomingURL; - } - - /** - * Gets the HTTP method to use for the REST request. - * - * @return the HTTP method - */ - public HTTP_METHOD getHttpMethod() { - return httpMethod; - } - - /** - * Sets the HTTP method to use for the REST request. - * - * @param httpMethod the HTTP method - */ - public void setHttpMethod(final HTTP_METHOD httpMethod) { - this.httpMethod = httpMethod; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#toString() - */ - - @Override - public String toString() { - return "RESTRequestorCarrierTechnologyParameters [url=" + url + ", httpMethod=" + httpMethod + "]"; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParameters.java new file mode 100644 index 000000000..acd5e52e8 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/RestRequestorCarrierTechnologyParameters.java @@ -0,0 +1,125 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.restrequestor; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * Apex parameters for REST as an event carrier technology with Apex issuing a REST request and receiving a REST + * response. + * + *

The parameters for this plugin are: + *

    + *
  1. url: The URL that the Apex Rest Requestor will connect to over REST for REST request sending. This parameter is + * mandatory. + *
  2. httpMethod: The HTTP method to use when making requests over REST, legal values are GET (default), POST, PUT, and + * DELETE. + *
  3. restRequestTimeout: The time in milliseconds to wait for a REST request to complete. + *
+ * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class RestRequestorCarrierTechnologyParameters extends CarrierTechnologyParameters { + /** The supported HTTP methods. */ + public enum HttpMethod { + GET, PUT, POST, DELETE + } + + /** The label of this carrier technology. */ + public static final String RESTREQUESTOR_CARRIER_TECHNOLOGY_LABEL = "RESTREQUESTOR"; + + /** The producer plugin class for the REST carrier technology. */ + public static final String RESTREQUSTOR_EVENT_PRODUCER_PLUGIN_CLASS = + ApexRestRequestorProducer.class.getCanonicalName(); + + /** The consumer plugin class for the REST carrier technology. */ + public static final String RESTREQUSTOR_EVENT_CONSUMER_PLUGIN_CLASS = + ApexRestRequestorConsumer.class.getCanonicalName(); + + /** The default HTTP method for request events. */ + public static final HttpMethod DEFAULT_REQUESTOR_HTTP_METHOD = HttpMethod.GET; + + /** The default timeout for REST requests. */ + public static final long DEFAULT_REST_REQUEST_TIMEOUT = 500; + + private String url = null; + private HttpMethod httpMethod = null; + + /** + * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter + * service. + */ + public RestRequestorCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the web socket carrier technology + this.setLabel(RESTREQUESTOR_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(RESTREQUSTOR_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(RESTREQUSTOR_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the URL for the REST request. + * + * @return the URL + */ + public String getUrl() { + return url; + } + + /** + * Sets the URL for the REST request. + * + * @param incomingUrl the URL + */ + public void setUrl(final String incomingUrl) { + this.url = incomingUrl; + } + + /** + * Gets the HTTP method to use for the REST request. + * + * @return the HTTP method + */ + public HttpMethod getHttpMethod() { + return httpMethod; + } + + /** + * Sets the HTTP method to use for the REST request. + * + * @param httpMethod the HTTP method + */ + public void setHttpMethod(final HttpMethod httpMethod) { + this.httpMethod = httpMethod; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + + @Override + public String toString() { + return "RESTRequestorCarrierTechnologyParameters [url=" + url + ", httpMethod=" + httpMethod + "]"; + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRESTRequestor.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRESTRequestor.java deleted file mode 100644 index 3db0f1467..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRESTRequestor.java +++ /dev/null @@ -1,291 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.apps.uservice.test.adapt.restrequestor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.gson.Gson; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.net.URI; -import java.util.Map; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.core.Response; - -import org.glassfish.grizzly.http.server.HttpServer; -import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory; -import org.glassfish.jersey.server.ResourceConfig; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; -import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.model.basicmodel.concepts.ApexException; -import org.onap.policy.apex.service.engine.main.ApexMain; - -public class TestRESTRequestor { - private static final String BASE_URI = "http://localhost:32801/TestRESTRequestor"; - private static HttpServer server; - - private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); - private ByteArrayOutputStream errContent = new ByteArrayOutputStream(); - - private final PrintStream stdout = System.out; - private final PrintStream stderr = System.err; - - @BeforeClass - public static void setUp() throws Exception { - final ResourceConfig rc = new ResourceConfig(TestRestRequestorEndpoint.class); - server = GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc); - - while (!server.isStarted()) { - ThreadUtilities.sleep(50); - } - } - - @AfterClass - public static void tearDown() throws Exception { - server.shutdownNow(); - - new File("src/test/resources/events/EventsOut.json").delete(); - new File("src/test/resources/events/EventsOutMulti0.json").delete(); - new File("src/test/resources/events/EventsOutMulti1.json").delete(); - } - - @Before - public void resetCounters() { - TestRestRequestorEndpoint.resetCounters(); - } - - @Test - public void testRESTRequestorGet() throws MessagingException, ApexException, IOException { - final Client client = ClientBuilder.newClient(); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FileGet.json" }; - final ApexMain apexMain = new ApexMain(args); - - // Wait for the required amount of events to be received or for 10 seconds - Double getsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map jsonMap = new Gson().fromJson(responseString, Map.class); - getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); - - if (getsSoFar >= 50.0) { - break; - } - } - - apexMain.shutdown(); - client.close(); - - assertEquals(Double.valueOf(50.0), getsSoFar); - } - - @Test - public void testRESTRequestorPut() throws MessagingException, ApexException, IOException { - final Client client = ClientBuilder.newClient(); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FilePut.json" }; - final ApexMain apexMain = new ApexMain(args); - - // Wait for the required amount of events to be received or for 10 seconds - Double putsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map jsonMap = new Gson().fromJson(responseString, Map.class); - putsSoFar = Double.valueOf(jsonMap.get("PUT").toString()); - - if (putsSoFar >= 50.0) { - break; - } - } - - apexMain.shutdown(); - client.close(); - - assertEquals(Double.valueOf(50.0), putsSoFar); - } - - @Test - public void testRESTRequestorPost() throws MessagingException, ApexException, IOException { - final Client client = ClientBuilder.newClient(); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FilePost.json" }; - final ApexMain apexMain = new ApexMain(args); - - // Wait for the required amount of events to be received or for 10 seconds - Double postsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map jsonMap = new Gson().fromJson(responseString, Map.class); - postsSoFar = Double.valueOf(jsonMap.get("POST").toString()); - - if (postsSoFar >= 50.0) { - break; - } - } - - apexMain.shutdown(); - client.close(); - - assertEquals(Double.valueOf(50.0), postsSoFar); - } - - @Test - public void testRESTRequestorDelete() throws MessagingException, ApexException, IOException { - final Client client = ClientBuilder.newClient(); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FileDelete.json" }; - final ApexMain apexMain = new ApexMain(args); - - // Wait for the required amount of events to be received or for 10 seconds - Double deletesSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map jsonMap = new Gson().fromJson(responseString, Map.class); - deletesSoFar = Double.valueOf(jsonMap.get("DELETE").toString()); - - if (deletesSoFar >= 50.0) { - break; - } - } - - apexMain.shutdown(); - client.close(); - - assertEquals(Double.valueOf(50.0), deletesSoFar); - } - - @Test - public void testRESTRequestorMultiInputs() throws MessagingException, ApexException, IOException { - final Client client = ClientBuilder.newClient(); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json" }; - final ApexMain apexMain = new ApexMain(args); - - // Wait for the required amount of events to be received or for 10 seconds - Double getsSoFar = 0.0; - for (int i = 0; i < 40; i++) { - ThreadUtilities.sleep(100); - - final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") - .request("application/json").get(); - - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - final String responseString = response.readEntity(String.class); - - @SuppressWarnings("unchecked") - final Map jsonMap = new Gson().fromJson(responseString, Map.class); - getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); - - if (getsSoFar >= 8.0) { - break; - } - } - - apexMain.shutdown(); - client.close(); - - assertEquals(Double.valueOf(8.0), getsSoFar); - - ThreadUtilities.sleep(1000); - } - - @Test - public void testRESTRequestorProducerAlone() throws MessagingException, ApexException, IOException { - System.setOut(new PrintStream(outContent)); - System.setErr(new PrintStream(errContent)); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json" }; - - final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(200); - apexMain.shutdown(); - - final String outString = outContent.toString(); - - System.setOut(stdout); - System.setErr(stderr); - - assertTrue(outString.contains( - "REST Requestor producer (RestRequestorProducer) must run in peered requestor mode with a REST Requestor consumer")); - } - - @Test - public void testRESTRequestorConsumerAlone() throws MessagingException, ApexException, IOException { - System.setOut(new PrintStream(outContent)); - System.setErr(new PrintStream(errContent)); - - final String[] args = { "src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json" }; - - final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(200); - apexMain.shutdown(); - - final String outString = outContent.toString(); - - System.setOut(stdout); - System.setErr(stderr); - - assertTrue(outString.contains( - "peer \"RestRequestorProducer for peered mode REQUESTOR does not exist or is not defined with the same peered mode")); - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestor.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestor.java new file mode 100644 index 000000000..051647339 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestor.java @@ -0,0 +1,363 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.apps.uservice.test.adapt.restrequestor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.gson.Gson; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.util.Map; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; + +import org.glassfish.grizzly.http.server.HttpServer; +import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.service.engine.main.ApexMain; + +/** + * The Class TestRestRequestor. + */ +public class TestRestRequestor { + private static final String BASE_URI = "http://localhost:32801/TestRESTRequestor"; + private static HttpServer server; + + private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + + private final PrintStream stdout = System.out; + private final PrintStream stderr = System.err; + + /** + * Sets the up. + * + * @throws Exception the exception + */ + @BeforeClass + public static void setUp() throws Exception { + final ResourceConfig rc = new ResourceConfig(TestRestRequestorEndpoint.class); + server = GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc); + + while (!server.isStarted()) { + ThreadUtilities.sleep(50); + } + } + + /** + * Tear down. + * + * @throws Exception the exception + */ + @AfterClass + public static void tearDown() throws Exception { + server.shutdownNow(); + + new File("src/test/resources/events/EventsOut.json").delete(); + new File("src/test/resources/events/EventsOutMulti0.json").delete(); + new File("src/test/resources/events/EventsOutMulti1.json").delete(); + } + + /** + * Reset counters. + */ + @Before + public void resetCounters() { + TestRestRequestorEndpoint.resetCounters(); + } + + /** + * Test rest requestor get. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorGet() throws MessagingException, ApexException, IOException { + final Client client = ClientBuilder.newClient(); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FileGet.json" }; + final ApexMain apexMain = new ApexMain(args); + + // Wait for the required amount of events to be received or for 10 seconds + Double getsSoFar = 0.0; + for (int i = 0; i < 40; i++) { + ThreadUtilities.sleep(100); + + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map jsonMap = new Gson().fromJson(responseString, Map.class); + getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); + + if (getsSoFar >= 50.0) { + break; + } + } + + apexMain.shutdown(); + client.close(); + + assertEquals(Double.valueOf(50.0), getsSoFar); + } + + /** + * Test REST requestor put. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorPut() throws MessagingException, ApexException, IOException { + final Client client = ClientBuilder.newClient(); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FilePut.json" }; + final ApexMain apexMain = new ApexMain(args); + + // Wait for the required amount of events to be received or for 10 seconds + Double putsSoFar = 0.0; + for (int i = 0; i < 40; i++) { + ThreadUtilities.sleep(100); + + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map jsonMap = new Gson().fromJson(responseString, Map.class); + putsSoFar = Double.valueOf(jsonMap.get("PUT").toString()); + + if (putsSoFar >= 50.0) { + break; + } + } + + apexMain.shutdown(); + client.close(); + + assertEquals(Double.valueOf(50.0), putsSoFar); + } + + /** + * Test REST requestor post. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorPost() throws MessagingException, ApexException, IOException { + final Client client = ClientBuilder.newClient(); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FilePost.json" }; + final ApexMain apexMain = new ApexMain(args); + + // Wait for the required amount of events to be received or for 10 seconds + Double postsSoFar = 0.0; + for (int i = 0; i < 40; i++) { + ThreadUtilities.sleep(100); + + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map jsonMap = new Gson().fromJson(responseString, Map.class); + postsSoFar = Double.valueOf(jsonMap.get("POST").toString()); + + if (postsSoFar >= 50.0) { + break; + } + } + + apexMain.shutdown(); + client.close(); + + assertEquals(Double.valueOf(50.0), postsSoFar); + } + + /** + * Test REST requestor delete. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorDelete() throws MessagingException, ApexException, IOException { + final Client client = ClientBuilder.newClient(); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FileDelete.json" }; + final ApexMain apexMain = new ApexMain(args); + + // Wait for the required amount of events to be received or for 10 seconds + Double deletesSoFar = 0.0; + for (int i = 0; i < 40; i++) { + ThreadUtilities.sleep(100); + + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map jsonMap = new Gson().fromJson(responseString, Map.class); + deletesSoFar = Double.valueOf(jsonMap.get("DELETE").toString()); + + if (deletesSoFar >= 50.0) { + break; + } + } + + apexMain.shutdown(); + client.close(); + + assertEquals(Double.valueOf(50.0), deletesSoFar); + } + + /** + * Test REST requestor multi inputs. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorMultiInputs() throws MessagingException, ApexException, IOException { + final Client client = ClientBuilder.newClient(); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json" }; + final ApexMain apexMain = new ApexMain(args); + + // Wait for the required amount of events to be received or for 10 seconds + Double getsSoFar = 0.0; + for (int i = 0; i < 40; i++) { + ThreadUtilities.sleep(100); + + final Response response = client.target("http://localhost:32801/TestRESTRequestor/apex/event/Stats") + .request("application/json").get(); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + final String responseString = response.readEntity(String.class); + + @SuppressWarnings("unchecked") + final Map jsonMap = new Gson().fromJson(responseString, Map.class); + getsSoFar = Double.valueOf(jsonMap.get("GET").toString()); + + if (getsSoFar >= 8.0) { + break; + } + } + + apexMain.shutdown(); + client.close(); + + assertEquals(Double.valueOf(8.0), getsSoFar); + + ThreadUtilities.sleep(1000); + } + + /** + * Test REST requestor producer alone. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorProducerAlone() throws MessagingException, ApexException, IOException { + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json" }; + + final ApexMain apexMain = new ApexMain(args); + ThreadUtilities.sleep(200); + apexMain.shutdown(); + + final String outString = outContent.toString(); + + System.setOut(stdout); + System.setErr(stderr); + + assertTrue(outString.contains("REST Requestor producer (RestRequestorProducer) " + + "must run in peered requestor mode with a REST Requestor consumer")); + } + + /** + * Test REST requestor consumer alone. + * + * @throws MessagingException the messaging exception + * @throws ApexException the apex exception + * @throws IOException Signals that an I/O exception has occurred. + */ + @Test + public void testRestRequestorConsumerAlone() throws MessagingException, ApexException, IOException { + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + final String[] args = + { "src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json" }; + + final ApexMain apexMain = new ApexMain(args); + ThreadUtilities.sleep(200); + apexMain.shutdown(); + + final String outString = outContent.toString(); + + System.setOut(stdout); + System.setErr(stderr); + + assertTrue(outString.contains("peer \"RestRequestorProducer for peered mode REQUESTOR " + + "does not exist or is not defined with the same peered mode")); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestorEndpoint.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestorEndpoint.java index 4977b3efb..0de34eb9b 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestorEndpoint.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/java/org/onap/policy/apex/apps/uservice/test/adapt/restrequestor/TestRestRequestorEndpoint.java @@ -34,6 +34,9 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.core.Response; +/** + * The Class TestRestRequestorEndpoint. + */ @Path("/apex") public class TestRestRequestorEndpoint { @@ -50,6 +53,9 @@ public class TestRestRequestorEndpoint { + "\"TestMatchCase\": 2,\n" + "\"TestTimestamp\": " + System.currentTimeMillis() + ",\n" + "\"TestTemperature\": 9080.866\n" + "}"; + /** + * Reset counters. + */ public static void resetCounters() { postMessagesReceived = 0; putMessagesReceived = 0; @@ -58,6 +64,11 @@ public class TestRestRequestorEndpoint { deleteMessagesReceived = 0; } + /** + * Service get stats. + * + * @return the response + */ @Path("/event/Stats") @GET public Response serviceGetStats() { @@ -71,6 +82,11 @@ public class TestRestRequestorEndpoint { .build(); } + /** + * Service get event. + * + * @return the response + */ @Path("/event/GetEvent") @GET public Response serviceGetEvent() { @@ -81,18 +97,34 @@ public class TestRestRequestorEndpoint { return Response.status(200).entity(EVENT_STRING).build(); } + /** + * Service get empty event. + * + * @return the response + */ @Path("/event/GetEmptyEvent") @GET public Response serviceGetEmptyEvent() { return Response.status(200).build(); } + /** + * Service get event bad response. + * + * @return the response + */ @Path("/event/GetEventBadResponse") @GET public Response serviceGetEventBadResponse() { return Response.status(400).build(); } + /** + * Service post request. + * + * @param jsonString the json string + * @return the response + */ @Path("/event/PostEvent") @POST public Response servicePostRequest(final String jsonString) { @@ -111,12 +143,24 @@ public class TestRestRequestorEndpoint { return Response.status(200).entity(EVENT_STRING).build(); } + /** + * Service post request bad response. + * + * @param jsonString the json string + * @return the response + */ @Path("/event/PostEventBadResponse") @POST public Response servicePostRequestBadResponse(final String jsonString) { return Response.status(400).build(); } + /** + * Service put request. + * + * @param jsonString the json string + * @return the response + */ @Path("/event/PutEvent") @PUT public Response servicePutRequest(final String jsonString) { @@ -135,6 +179,12 @@ public class TestRestRequestorEndpoint { return Response.status(200).entity(EVENT_STRING).build(); } + /** + * Service delete request. + * + * @param jsonString the json string + * @return the response + */ @Path("/event/DeleteEvent") @DELETE public Response serviceDeleteRequest(final String jsonString) { @@ -145,6 +195,12 @@ public class TestRestRequestorEndpoint { return Response.status(200).entity(EVENT_STRING).build(); } + /** + * Service delete request bad response. + * + * @param jsonString the json string + * @return the response + */ @Path("/event/DeleteEventBadResponse") @DELETE public Response serviceDeleteRequestBadResponse(final String jsonString) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileDelete.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileDelete.json index 6e12c0b1f..46da3970a 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileDelete.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileDelete.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/DeleteEvent", "httpMethod": "DELETE", @@ -49,7 +49,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGet.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGet.json index d0879eb73..3c1d314fe 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGet.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGet.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/GetEvent", "httpMethod": "GET", @@ -49,7 +49,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json index b957b61de..9ebe89df5 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetConsumerAlone.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/GetEvent", "httpMethod": "GET", @@ -49,7 +49,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json index ba1c997f5..424d2b454 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetMulti.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer0": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/GetEvent", "httpMethod": "GET", @@ -58,7 +58,7 @@ "RestRequestorConsumer1": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/GetEvent", "httpMethod": "GET", @@ -78,7 +78,7 @@ "RestRequestorProducer0": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" @@ -103,7 +103,7 @@ "RestRequestorProducer1": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json index a635e6c72..4fcbf59d9 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FileGetProducerAlone.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -31,7 +31,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePost.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePost.json index 9db89ce89..fe5af67ee 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePost.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePost.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/PostEvent", "httpMethod": "POST", @@ -49,7 +49,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePut.json b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePut.json index 3eebe3d8a..e78446447 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePut.json +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/test/resources/prodcons/File2RESTRequest2FilePut.json @@ -9,7 +9,7 @@ "engineParameters": { "executorParameters": { "MVEL": { - "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MVELExecutorParameters" + "parameterClassName": "org.onap.policy.apex.plugins.executor.mvel.MvelExecutorParameters" } } } @@ -29,7 +29,7 @@ "RestRequestorConsumer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters", "parameters": { "url": "http://localhost:32801/TestRESTRequestor/apex/event/PutEvent", "httpMethod": "PUT", @@ -49,7 +49,7 @@ "RestRequestorProducer": { "carrierTechnologyParameters": { "carrierTechnology": "RESTREQUESTOR", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters" + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restrequestor.RestRequestorCarrierTechnologyParameters" }, "eventProtocolParameters": { "eventProtocol": "JSON" diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java index 8cf0c8f9c..94063af20 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerConsumer.java @@ -56,9 +56,6 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { // The amount of time to wait in milliseconds between checks that the consumer thread has stopped private static final long REST_SERVER_CONSUMER_WAIT_SLEEP_TIME = 50; - // The REST parameters read from the parameter service - private RESTServerCarrierTechnologyParameters restConsumerProperties; - // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; @@ -84,7 +81,7 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { * * @return the next candidate value for a Execution ID */ - private static synchronized long getNextExecutionID() { + private static synchronized long getNextExecutionId() { return nextExecutionID.getAndIncrement(); } @@ -102,14 +99,16 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { this.name = consumerName; // Check and get the REST Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RESTServerCarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) { final String errorMessage = "specified consumer properties are not applicable to REST Server consumer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restConsumerProperties = - (RESTServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // The REST parameters read from the parameter service + RestServerCarrierTechnologyParameters restConsumerProperties = + (RestServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Check if we are in synchronous mode if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) { @@ -131,12 +130,12 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { } // Compose the URI for the standalone server - final String baseURI = String.format(BASE_URI_TEMPLATE, restConsumerProperties.getHost(), + final String baseUrl = String.format(BASE_URI_TEMPLATE, restConsumerProperties.getHost(), restConsumerProperties.getPort()); // Instantiate the standalone server final ResourceConfig rc = new ResourceConfig(RestServerEndpoint.class, AccessControlFilter.class); - server = GrizzlyHttpServerFactory.createHttpServer(URI.create(baseURI), rc); + server = GrizzlyHttpServerFactory.createHttpServer(URI.create(baseUrl), rc); while (!server.isStarted()) { ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME); @@ -201,10 +200,11 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { */ public Response receiveEvent(final String event) { // Get an execution ID for the event - final long executionId = getNextExecutionID(); + final long executionId = getNextExecutionId(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug(name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event); + String message = name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event; + LOGGER.debug(message); } try { @@ -222,7 +222,8 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { // Wait until the event is in the cache of events sent to apex do { ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME); - } while (!synchronousEventCache.existsEventToApex(executionId)); + } + while (!synchronousEventCache.existsEventToApex(executionId)); // Now wait for the reply or for the event to time put do { @@ -238,7 +239,8 @@ public class ApexRestServerConsumer implements ApexEventConsumer, Runnable { // Return the event as a response to the call return Response.status(Response.Status.OK.getStatusCode()).entity(responseEvent.toString()).build(); } - } while (synchronousEventCache.existsEventToApex(executionId)); + } + while (synchronousEventCache.existsEventToApex(executionId)); // The event timed out final String errorMessage = "processing of event on event consumer " + name + " timed out, event=" + event; diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerProducer.java index e51482ce4..cacdb3408 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/ApexRestServerProducer.java @@ -41,9 +41,6 @@ import org.slf4j.LoggerFactory; public class ApexRestServerProducer implements ApexEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerProducer.class); - // The REST carrier properties - private RESTServerCarrierTechnologyParameters restProducerProperties; - // The name for this producer private String name = null; @@ -62,14 +59,16 @@ public class ApexRestServerProducer implements ApexEventProducer { this.name = producerName; // Check and get the REST Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof RESTServerCarrierTechnologyParameters)) { + if (!(producerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) { final String errorMessage = "specified producer properties are not applicable to REST Server producer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restProducerProperties = - (RESTServerCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + // The REST carrier properties + RestServerCarrierTechnologyParameters restProducerProperties = + (RestServerCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); // Check if host and port are defined if (restProducerProperties.getHost() != null || restProducerProperties.getPort() != -1 @@ -131,7 +130,8 @@ public class ApexRestServerProducer implements ApexEventProducer { @Override public void sendEvent(final long executionId, final String eventName, final Object event) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug(name + ": event " + executionId + ':' + eventName + " recevied from Apex, event=" + event); + String message = name + ": event " + executionId + ':' + eventName + " recevied from Apex, event=" + event; + LOGGER.debug(message); } // If we are not synchronized, then exit @@ -163,5 +163,7 @@ public class ApexRestServerProducer implements ApexEventProducer { * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#stop() */ @Override - public void stop() {} + public void stop() { + // Implementation not required on this class + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RESTServerCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RESTServerCarrierTechnologyParameters.java deleted file mode 100644 index cd7f388f2..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RESTServerCarrierTechnologyParameters.java +++ /dev/null @@ -1,136 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.restserver; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.GroupValidationResult; -import org.onap.policy.common.parameters.ValidationStatus; - -/** - * Apex parameters for REST as an event carrier technology with Apex as a REST client. - * - * The parameters for this plugin are: - *
    - *
  1. standalone: A flag indicating if APEX should start a standalone HTTP server to process REST requests (true) or - * whether it should use an underlying servlet infrastructure such as Apache Tomcat (False). This parameter is legal - * only on REST server event inputs. - *
  2. host: The host name to use when setting up a standalone HTTP server. This parameter is legal only on REST server - * event inputs in standalone mode. - *
  3. port: The port to use when setting up a standalone HTTP server. This parameter is legal only on REST server event - * inputs in standalone mode. - *
- * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class RESTServerCarrierTechnologyParameters extends CarrierTechnologyParameters { - // @formatter:off - private static final int MIN_USER_PORT = 1024; - private static final int MAX_USER_PORT = 65535; - - /** The label of this carrier technology. */ - public static final String RESTSERVER_CARRIER_TECHNOLOGY_LABEL = "RESTSERVER"; - - /** The producer plugin class for the REST carrier technology. */ - public static final String RESTSERVER_EVENT_PRODUCER_PLUGIN_CLASS = ApexRestServerProducer.class.getCanonicalName(); - - /** The consumer plugin class for the REST carrier technology. */ - public static final String RESTSERVER_EVENT_CONSUMER_PLUGIN_CLASS = ApexRestServerConsumer.class.getCanonicalName(); - - // REST server parameters - private boolean standalone = false; - private String host = null; - private int port = -1; - // @formatter:on - - /** - * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter - * service. - */ - public RESTServerCarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the web socket carrier technology - this.setLabel(RESTSERVER_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(RESTSERVER_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(RESTSERVER_EVENT_CONSUMER_PLUGIN_CLASS); - } - - /** - * Check if the REST server is running in standalone mode or is using an underlying servlet infrastructure to manage - * requests. - * - * @return true if in standalone mode - */ - public boolean isStandalone() { - return standalone; - } - - /** - * Gets the host. - * - * @return the host - */ - public String getHost() { - return host; - } - - /** - * Gets the port. - * - * @return the port - */ - public int getPort() { - return port; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() - */ - @Override - public GroupValidationResult validate() { - final GroupValidationResult result = super.validate(); - - // Check if host is defined, it is only defined on REST server consumers - if (standalone) { - if (host != null && host.trim().length() == 0) { - result.setResult("host", ValidationStatus.INVALID, - "host not specified, host must be specified as a string"); - } - - // Check if port is defined, it is only defined on REST server consumers - if (port != -1 && port < MIN_USER_PORT || port > MAX_USER_PORT) { - result.setResult("port", ValidationStatus.INVALID, - "[" + port + "] invalid, must be specified as 1024 <= port <= 65535"); - } - } else { - if (host != null) { - result.setResult("host", ValidationStatus.INVALID, "host is specified only in standalone mode"); - } - if (port != -1) { - result.setResult("port", ValidationStatus.INVALID, "port is specified only in standalone mode"); - } - } - - return result; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerCarrierTechnologyParameters.java new file mode 100644 index 000000000..76ec577b8 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerCarrierTechnologyParameters.java @@ -0,0 +1,136 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.restserver; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationStatus; + +/** + * Apex parameters for REST as an event carrier technology with Apex as a REST client. + * + *

The parameters for this plugin are: + *

    + *
  1. standalone: A flag indicating if APEX should start a standalone HTTP server to process REST requests (true) or + * whether it should use an underlying servlet infrastructure such as Apache Tomcat (False). This parameter is legal + * only on REST server event inputs. + *
  2. host: The host name to use when setting up a standalone HTTP server. This parameter is legal only on REST server + * event inputs in standalone mode. + *
  3. port: The port to use when setting up a standalone HTTP server. This parameter is legal only on REST server event + * inputs in standalone mode. + *
+ * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class RestServerCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + /** The label of this carrier technology. */ + public static final String RESTSERVER_CARRIER_TECHNOLOGY_LABEL = "RESTSERVER"; + + /** The producer plugin class for the REST carrier technology. */ + public static final String RESTSERVER_EVENT_PRODUCER_PLUGIN_CLASS = ApexRestServerProducer.class.getCanonicalName(); + + /** The consumer plugin class for the REST carrier technology. */ + public static final String RESTSERVER_EVENT_CONSUMER_PLUGIN_CLASS = ApexRestServerConsumer.class.getCanonicalName(); + + // REST server parameters + private boolean standalone = false; + private String host = null; + private int port = -1; + // @formatter:on + + /** + * Constructor to create a REST carrier technology parameters instance and register the instance with the parameter + * service. + */ + public RestServerCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the web socket carrier technology + this.setLabel(RESTSERVER_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(RESTSERVER_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(RESTSERVER_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Check if the REST server is running in standalone mode or is using an underlying servlet infrastructure to manage + * requests. + * + * @return true if in standalone mode + */ + public boolean isStandalone() { + return standalone; + } + + /** + * Gets the host. + * + * @return the host + */ + public String getHost() { + return host; + } + + /** + * Gets the port. + * + * @return the port + */ + public int getPort() { + return port; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public GroupValidationResult validate() { + final GroupValidationResult result = super.validate(); + + // Check if host is defined, it is only defined on REST server consumers + if (standalone) { + if (host != null && host.trim().length() == 0) { + result.setResult("host", ValidationStatus.INVALID, + "host not specified, host must be specified as a string"); + } + + // Check if port is defined, it is only defined on REST server consumers + if (port != -1 && port < MIN_USER_PORT || port > MAX_USER_PORT) { + result.setResult("port", ValidationStatus.INVALID, + "[" + port + "] invalid, must be specified as 1024 <= port <= 65535"); + } + } else { + if (host != null) { + result.setResult("host", ValidationStatus.INVALID, "host is specified only in standalone mode"); + } + if (port != -1) { + result.setResult("port", ValidationStatus.INVALID, "port is specified only in standalone mode"); + } + } + + return result; + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerEndpoint.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerEndpoint.java index beee10fd0..f8524fcfd 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerEndpoint.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restserver/src/main/java/org/onap/policy/apex/plugins/event/carrier/restserver/RestServerEndpoint.java @@ -57,21 +57,21 @@ public class RestServerEndpoint { // This map is used to hold all the REST server event inputs. This is used to determine which consumer to send input // events to private static Map consumerMap = - new LinkedHashMap(); + new LinkedHashMap<>(); // The ID of this event input. This gets injected from the URL. @PathParam("eventInput") - private String eventInputID = null; + private String eventInputId = null; /** * Register an Apex consumer with the REST server end point. * - * @param consumerEventInputID The event input ID that indicates this consumer shoud be used + * @param consumerEventInputId The event input ID that indicates this consumer shoud be used * @param consumer The consumer to register */ - public static void registerApexRestServerConsumer(final String consumerEventInputID, + public static void registerApexRestServerConsumer(final String consumerEventInputId, final ApexRestServerConsumer consumer) { - consumerMap.put(consumerEventInputID, consumer); + consumerMap.put(consumerEventInputId, consumer); } /** @@ -102,7 +102,8 @@ public class RestServerEndpoint { postEventMessagesReceived++; if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event input " + eventInputID + ", received POST of event \"" + jsonString + "\""); + String message = "event input " + eventInputId + ", received POST of event \"" + jsonString + "\""; + LOGGER.debug(message); } // Common handler method for POST and PUT requests @@ -121,7 +122,8 @@ public class RestServerEndpoint { putEventMessagesReceived++; if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event input \"" + eventInputID + "\", received PUT of event \"" + jsonString + "\""); + String message = "event input \"" + eventInputId + "\", received PUT of event \"" + jsonString + "\""; + LOGGER.debug(message); } // Common handler method for POST and PUT requests @@ -136,10 +138,10 @@ public class RestServerEndpoint { */ private Response handleEvent(final String jsonString) { // Find the correct consumer for this REST message - final ApexRestServerConsumer eventConsumer = consumerMap.get(eventInputID); + final ApexRestServerConsumer eventConsumer = consumerMap.get(eventInputId); if (eventConsumer == null) { final String errorMessage = - "event input " + eventInputID + " is not defined in the Apex configuration file"; + "event input " + eventInputId + " is not defined in the Apex configuration file"; LOGGER.warn(errorMessage); return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) .entity("{'errorMessage', '" + errorMessage + "'}").build(); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java index e80efa2ee..fe3e47b82 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketConsumer.java @@ -50,9 +50,6 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexWebSocketConsumer.class); - // The Web Socket properties - private WEBSOCKETCarrierTechnologyParameters webSocketConsumerProperties; - // The web socket messager, may be WS a server or a client private WsStringMessager wsStringMessager; @@ -79,12 +76,14 @@ public class ApexWebSocketConsumer implements ApexEventConsumer, WsStringMessage this.name = consumerName; // Check and get the Kafka Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof WEBSOCKETCarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof WebSocketCarrierTechnologyParameters)) { LOGGER.warn("specified consumer properties are not applicable to a web socket consumer"); throw new ApexEventException("specified consumer properties are not applicable to a web socket consumer"); } - webSocketConsumerProperties = - (WEBSOCKETCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // The Web Socket properties + WebSocketCarrierTechnologyParameters webSocketConsumerProperties = + (WebSocketCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Check if this is a server or a client Web Socket if (webSocketConsumerProperties.isWsClient()) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java index 6be8a3d1a..7dd1ab947 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/ApexWebSocketProducer.java @@ -46,9 +46,6 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexWebSocketProducer.class); - // The Web Socket properties - private WEBSOCKETCarrierTechnologyParameters webSocketProducerProperties; - // The web socket messager, may be WS a server or a client private WsStringMessager wsStringMessager; @@ -60,23 +57,25 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage @Override public void init(final String producerName, final EventHandlerParameters producerParameters) - throws ApexEventException { + throws ApexEventException { this.name = producerName; // Check and get the web socket Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof WEBSOCKETCarrierTechnologyParameters)) { - LOGGER.warn( - "specified producer properties for " + this.name + "are not applicable to a web socket producer"); + if (!(producerParameters.getCarrierTechnologyParameters() instanceof WebSocketCarrierTechnologyParameters)) { + String message = "specified producer properties for " + this.name + + "are not applicable to a web socket producer"; + LOGGER.warn(message); throw new ApexEventException("specified producer properties are not applicable to a web socket producer"); } - webSocketProducerProperties = - (WEBSOCKETCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + // The Web Socket properties + WebSocketCarrierTechnologyParameters webSocketProducerProperties = + (WebSocketCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); // Check if this is a server or a client Web Socket if (webSocketProducerProperties.isWsClient()) { // Create a WS client wsStringMessager = new WsStringMessageClient(webSocketProducerProperties.getHost(), - webSocketProducerProperties.getPort()); + webSocketProducerProperties.getPort()); } else { wsStringMessager = new WsStringMessageServer(webSocketProducerProperties.getPort()); } @@ -85,7 +84,8 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage try { wsStringMessager.start(this); } catch (final MessagingException e) { - LOGGER.warn("could not start web socket producer (" + this.name + ")"); + String message = "could not start web socket producer (" + this.name + ")"; + LOGGER.warn(message); } } @@ -130,8 +130,8 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage @Override public void sendEvent(final long executionId, final String eventName, final Object event) { // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap + .get(EventHandlerPeeredMode.SYNCHRONOUS); if (synchronousEventCache != null) { synchronousEventCache.removeCachedEventToApexIfExists(executionId); } @@ -160,7 +160,8 @@ public class ApexWebSocketProducer implements ApexEventProducer, WsStringMessage */ @Override public void receiveString(final String messageString) { - LOGGER.warn("received message \"" + messageString + "\" on web socket producer (" + this.name - + ") , no messages should be received on a web socket producer"); + String message = "received message \"" + messageString + "\" on web socket producer (" + this.name + + ") , no messages should be received on a web socket producer"; + LOGGER.warn(message); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WEBSOCKETCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WEBSOCKETCarrierTechnologyParameters.java deleted file mode 100644 index e04a81d1d..000000000 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WEBSOCKETCarrierTechnologyParameters.java +++ /dev/null @@ -1,116 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.apex.plugins.event.carrier.websocket; - -import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.GroupValidationResult; -import org.onap.policy.common.parameters.ValidationStatus; - -/** - * Apex parameters for Kafka as an event carrier technology. - * - * @author Liam Fallon (liam.fallon@ericsson.com) - */ -public class WEBSOCKETCarrierTechnologyParameters extends CarrierTechnologyParameters { - // @formatter:off - private static final int MIN_USER_PORT = 1024; - private static final int MAX_USER_PORT = 65535; - - /** The label of this carrier technology. */ - public static final String WEB_SCOKET_CARRIER_TECHNOLOGY_LABEL = "WEBSOCKET"; - - /** The producer plugin class for the web socket carrier technology. */ - public static final String WEB_SCOKET_EVENT_PRODUCER_PLUGIN_CLASS = ApexWebSocketProducer.class.getCanonicalName(); - - /** The consumer plugin class for the web socket carrier technology. */ - public static final String KWEB_SCOKET_EVENT_CONSUMER_PLUGIN_CLASS = ApexWebSocketConsumer.class.getCanonicalName(); - - // Default parameter values - private static final String DEFAULT_HOST = "localhost"; - private static final int DEFAULT_PORT = -1; - - // Web socket parameters - private boolean wsClient = true; - private String host = DEFAULT_HOST; - private int port = DEFAULT_PORT; - // @formatter:on - - /** - * Constructor to create a web socket carrier technology parameters instance and register the instance with the - * parameter service. - */ - public WEBSOCKETCarrierTechnologyParameters() { - super(); - - // Set the carrier technology properties for the web socket carrier technology - this.setLabel(WEB_SCOKET_CARRIER_TECHNOLOGY_LABEL); - this.setEventProducerPluginClass(WEB_SCOKET_EVENT_PRODUCER_PLUGIN_CLASS); - this.setEventConsumerPluginClass(KWEB_SCOKET_EVENT_CONSUMER_PLUGIN_CLASS); - } - - /** - * Gets the host. - * - * @return the host - */ - public String getHost() { - return host; - } - - /** - * Gets the port. - * - * @return the port - */ - public int getPort() { - return port; - } - - /** - * Checks if is ws client. - * - * @return true, if checks if is ws client - */ - public boolean isWsClient() { - return wsClient; - } - - /* - * (non-Javadoc) - * - * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() - */ - @Override - public GroupValidationResult validate() { - final GroupValidationResult result = super.validate(); - - if (wsClient && (host == null || host.trim().length() == 0)) { - result.setResult("host", ValidationStatus.INVALID, "host not specified, must be host as a string"); - } - - if (port < MIN_USER_PORT || port > MAX_USER_PORT) { - result.setResult("port", ValidationStatus.INVALID, - "[" + port + "] invalid, must be specified as 1024 <= port <= 65535"); - } - - return result; - } -} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WebSocketCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WebSocketCarrierTechnologyParameters.java new file mode 100644 index 000000000..9febeb45b --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-websocket/src/main/java/org/onap/policy/apex/plugins/event/carrier/websocket/WebSocketCarrierTechnologyParameters.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.websocket; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ValidationStatus; + +/** + * Apex parameters for Kafka as an event carrier technology. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class WebSocketCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + /** The label of this carrier technology. */ + public static final String WEB_SCOKET_CARRIER_TECHNOLOGY_LABEL = "WEBSOCKET"; + + /** The producer plugin class for the web socket carrier technology. */ + public static final String WEB_SCOKET_EVENT_PRODUCER_PLUGIN_CLASS = ApexWebSocketProducer.class.getCanonicalName(); + + /** The consumer plugin class for the web socket carrier technology. */ + public static final String KWEB_SCOKET_EVENT_CONSUMER_PLUGIN_CLASS = ApexWebSocketConsumer.class.getCanonicalName(); + + // Default parameter values + private static final String DEFAULT_HOST = "localhost"; + private static final int DEFAULT_PORT = -1; + + // Web socket parameters + private boolean wsClient = true; + private String host = DEFAULT_HOST; + private int port = DEFAULT_PORT; + // @formatter:on + + /** + * Constructor to create a web socket carrier technology parameters instance and register the instance with the + * parameter service. + */ + public WebSocketCarrierTechnologyParameters() { + super(); + + // Set the carrier technology properties for the web socket carrier technology + this.setLabel(WEB_SCOKET_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(WEB_SCOKET_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(KWEB_SCOKET_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the host. + * + * @return the host + */ + public String getHost() { + return host; + } + + /** + * Gets the port. + * + * @return the port + */ + public int getPort() { + return port; + } + + /** + * Checks if is ws client. + * + * @return true, if checks if is ws client + */ + public boolean isWsClient() { + return wsClient; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public GroupValidationResult validate() { + final GroupValidationResult result = super.validate(); + + if (wsClient && (host == null || host.trim().length() == 0)) { + result.setResult("host", ValidationStatus.INVALID, "host not specified, must be host as a string"); + } + + if (port < MIN_USER_PORT || port > MAX_USER_PORT) { + result.setResult("port", ValidationStatus.INVALID, + "[" + port + "] invalid, must be specified as 1024 <= port <= 65535"); + } + + return result; + } +} -- cgit 1.2.3-korg