diff options
6 files changed, 484 insertions, 444 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java new file mode 100644 index 0000000..d177989 --- /dev/null +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqClientConfig.java @@ -0,0 +1,198 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.event.client; + +import java.util.HashMap; +import java.util.Map; + +import com.rabbitmq.client.BuiltinExchangeType; + +public class RabbitMqClientConfig { + + private static String BINDING_CONSUME_ALL = "#"; + + private String hosts; + private String username; + private String password; + private String exchangeName; + private String exchangeType; + private String routingKey; + private String virtualHost; + private String queue; + private String bindingKey; + private Map<String, Object> queueArguments; + private Map<String, Object> exchangeArguments; + private String sslKeyStoreFile; + private String sslTrustStoreFile; + private String sslKeyStorePassword; + private String sslTrustStorePassword; + private String connectionTimeout; + private String enableSsl; + + public String getUsername() { + return username; + } + public void setUsername(String username) { + this.username = username; + } + + public String getHosts() { + return hosts; + } + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public String getPassword() { + return password; + } + public void setPassword(String password) { + this.password = password; + } + + public String getExchangeName() { + return exchangeName; + } + public void setExchangeName(String exchangeName) { + this.exchangeName = exchangeName; + } + + public BuiltinExchangeType getExchangeType() { + if (exchangeType == null) { + return BuiltinExchangeType.TOPIC; + } + return BuiltinExchangeType.valueOf(exchangeType); + } + public void setExchangeType(String exchangeType) { + this.exchangeType = exchangeType; + } + + public String getRoutingKey() { + if (routingKey == null) { + return ""; + } + return routingKey; + } + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + public String getVirtualHost() { + return virtualHost; + } + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public String getQueue() { + return queue; + } + public void setQueue(String queue) { + this.queue = queue; + } + + public Map<String, Object> getQueueArguments() { + if (queueArguments == null) { + return new HashMap<String, Object>(); + } + return queueArguments; + } + public void setQueueArguments(Map<String, Object> queueArguments) { + this.queueArguments = queueArguments; + } + + public Map<String, Object> getExchangeArguments() { + if (exchangeArguments == null) { + return new HashMap<String, Object>(); + } + return exchangeArguments; + } + public void setExchangeArguments(Map<String, Object> exchangeArguments) { + this.exchangeArguments = exchangeArguments; + } + + public String getBindingKey() { + if (bindingKey == null) { + return BINDING_CONSUME_ALL; + } + return bindingKey; + } + + public void setBindingKey(String bindingKey) { + this.bindingKey = bindingKey; + } + + public String getSslKeyStoreFile() { + return sslKeyStoreFile; + } + + public void setSslKeyStoreFile(String sslKeyStoreFile) { + this.sslKeyStoreFile = sslKeyStoreFile; + } + + public String getSslTrustStoreFile() { + return sslTrustStoreFile; + } + + public void setSslTrustStoreFile(String sslTrustStoreFile) { + this.sslTrustStoreFile = sslTrustStoreFile; + } + + public String getSslKeyStorePassword() { + return sslKeyStorePassword; + } + + public void setSslKeyStorePassword(String sslKeyStorePassword) { + this.sslKeyStorePassword = sslKeyStorePassword; + } + + public String getSslTrustStorePassword() { + return sslTrustStorePassword; + } + + public void setSslTrustStorePassword(String sslTrustStorePassword) { + this.sslTrustStorePassword = sslTrustStorePassword; + } + + public String getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(String connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public Boolean getEnableSsl() { + if (enableSsl != null) { + return Boolean.valueOf(enableSsl); + } else { + return false; + } + } + + public void setEnableSsl(String enableSsl) { + this.enableSsl = enableSsl; + } + + + + } diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java index c564198..c10b46c 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java @@ -22,28 +22,23 @@ package org.onap.aai.event.client; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import javax.naming.ConfigurationException; + import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.event.api.EventConsumer; import org.onap.aai.event.api.MessageWithOffset; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.AMQP.Exchange.DeclareOk; /** * Event bus client consumer wrapper for RabbitMQ. @@ -57,27 +52,16 @@ import com.rabbitmq.client.AMQP.Exchange.DeclareOk; */ public class RabbitMqEventConsumer implements EventConsumer { private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); - private static ConnectionFactory factory = new ConnectionFactory(); - private BlockingQueue<MessageWithOffset> messageQueue; - private static String BINDING_CONSUME_ALL = "#"; + private BlockingQueue<MessageWithOffset> messageQueue; private final Connection connection; private final Channel channel; - private final String queueName; - private DeclareOk exchangeInfo; private Long lastDeliveryTag; private long timeout = 5000; - private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + private RabbitMqClientConfig config; + - /** - * (intended for testing prupose only) - * @param connFactory - */ - static void setConnectionFactory(ConnectionFactory connFactory) { - factory = connFactory; - } - /** * (intended for testing prupose only) * @param messageQueue @@ -86,148 +70,29 @@ public class RabbitMqEventConsumer implements EventConsumer { this.messageQueue = messageQueue; } - /** - * Constructor to open a consumer on single host port for a topic exchange with specific queue name which will - * consume all messages from topic - * @param host - * @param port - * @param userName - * @param password - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeName, String queueName) throws Exception { - this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on single host port for a exchange with specific queue name which will - * consume all messages from topic - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { - this(host, port, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on single host port for a exchange with specific queue name - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param queueName - * @param bindingKey - Bind the queue to specific messages only - * @throws Exception - */ - public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { - this(buildMap(host, port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); - } - - /** - * Constructor to open a consumer on multiple host port for a topic exchange with specific queue name which will - * consume all messages from topic - * @param hostPortMap - * @param userName - * @param password - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName, String queueName) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on multiple host port for a exchange with specific queue name which will - * consume all messages from topic - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @param queueName - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { - this(hostPortMap, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); - } - - /** - * Constructor to open a consumer on multiple host port for a exchange with specific queue name - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param queueName - * @param bindingKey - Bind the queue to specific messages only - * @throws Exception - */ - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); - } - - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String queueName, String bindingKey, Map<String, Object> exchangeArguments) throws Exception { - messageQueue = new ArrayBlockingQueue<>(1000); - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); - this.queueName = queueName; - channel.queueDeclare(queueName, true, false, false, null); - channel.queueBind(queueName, exchangeName, bindingKey); - String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false - } - - public RabbitMqEventConsumer(String host, int port, String userName, String password, String queue) throws Exception { - this(buildMap(host, port), userName, password, queue, new HashMap<String, Object>()); - } - - public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - messageQueue = new ArrayBlockingQueue<>(1000); - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - this.queueName = queue; - queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); - String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false - } - - private static Map<String, Integer> buildMap(String host, Integer port) { - Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); - hostPortMap.put(host, port); - return hostPortMap; - } + public RabbitMqEventConsumer(RabbitMqClientConfig config) throws Exception { + this.config = config; + this.messageQueue = new ArrayBlockingQueue<>(1000); + + if (config.getQueue() == null) { + throw new ConfigurationException("Mandatory config param queue not set"); + } + + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); + + if (config.getExchangeName() != null) { + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + channel.queueBind(config.getQueue(), config.getExchangeName(), config.getBindingKey()); + } + else { + channel.queueDeclare(config.getQueue(), true, false, false, config.getQueueArguments()); + } + + channel.basicConsume(config.getQueue(), false, new CallBackConsumer(channel)); //AutoAck is false + } + @Override public Iterable<String> consumeAndCommit() throws Exception { diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java index a7e0ee3..614ae54 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java @@ -21,23 +21,16 @@ package org.onap.aai.event.client; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; + +import javax.naming.ConfigurationException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.event.api.EventPublisher; -import com.rabbitmq.client.AMQP.Exchange.DeclareOk; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; /** * Event bus client publisher wrapper for RabbitMQ @@ -46,236 +39,82 @@ import com.rabbitmq.client.ConnectionFactory; */ public class RabbitMqEventPublisher implements EventPublisher { - private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private static ConnectionFactory factory = new ConnectionFactory(); + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private final Connection connection; - private final Channel channel; - private final String exchangeName; - private final String queueName; - private final String routingKey; - private DeclareOk exchangeInfo; - private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + private final Connection connection; + private final Channel channel; - static void setConnectionFactory(ConnectionFactory connFactory) { - factory = connFactory; - } - /** - * Constructor for single host port for a topic exchange - * @param host - * @param port - * @param userName - * @param password - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeName) throws Exception { - this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); - } - - /** - * Constructor for single host port for a specific exchange type - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName) throws Exception { - this(host,port, userName, password, exchangeType, exchangeName,""); - } - - /** - * Constructor for single host port for a specific exchange type and specific routing key - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param routingKey - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { - this(buildMap(host,port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); - } - - /** - * Constructor for multiple host and port for a topic exchange - * @param hostPortMap - * @param userName - * @param password - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); - } - - /** - * Constructor for multiple host port for a specific exchange type - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName) throws Exception { - this(hostPortMap, userName, password, exchangeType , exchangeName,""); - } - - /** - * Constructor for multiple host port for a specific exchange type and specific routing key - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param routingKey - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); - } + private RabbitMqClientConfig config; + + public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception { + this.config = config; + + if (config.getExchangeName() == null) { + throw new ConfigurationException("Mandatory config param exchangeName not set"); + } + + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String routingKey, Map<String, Object> exchangeArguments) throws Exception { - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - this.exchangeName = exchangeName; - exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); //Durable exchange and non delete - queueName = null; - this.routingKey=routingKey; - } - - /** - * Constructor for single host port for a queue connection - * @param host - * @param port - * @param userName - * @param password - * @param queue - * @param queueArguments - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - this(buildMap(host, port), userName, password, queue, queueArguments); - } - - /** - * Constructor for multiple host port for a queue connection - * @param hostPortMap - * @param userName - * @param password - * @param queue - * @param queueArguments - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - exchangeName = ""; - routingKey=queue; - this.queueName = queue; - queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); //Durable, non exclusive and non auto delete queue - } - - private static Map<String, Integer> buildMap(String host, Integer port) { - Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); - hostPortMap.put(host, port); - return hostPortMap; - } + //Durable exchange and non delete + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + } - @Override - public void close() throws Exception { - channel.close(); - connection.close(); - } + @Override + public void close() throws Exception { + channel.close(); + connection.close(); + } - @Override - public void sendAsync(String message) throws Exception { - sendSync(message); - } + @Override + public void sendAsync(String message) throws Exception { + sendSync(message); + } - @Override - public void sendAsync(Collection<String> messages) throws Exception { - sendSync(messages); - } + @Override + public void sendAsync(Collection<String> messages) throws Exception { + sendSync(messages); + } - @Override - public void sendAsync(String routingParam, String message) throws Exception { - sendSync(routingParam, message); - } + @Override + public void sendAsync(String routingParam, String message) throws Exception { + sendSync(routingParam, message); + } - @Override - public void sendAsync(String routingParam, Collection<String> messages) throws Exception { - sendSync(routingParam, messages); - } + @Override + public void sendAsync(String routingParam, Collection<String> messages) throws Exception { + sendSync(routingParam, messages); + } - @Override - public int sendSync(String message) throws Exception { - channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); - log.debug(" [x] Sent '" + message + "'"); - return 1; - } + @Override + public int sendSync(String message) throws Exception { + channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes()); + log.debug(" [x] Sent '" + message + "'"); + return 1; + } - @Override - public int sendSync(Collection<String> messages) throws Exception { - log.debug("Publishing" + messages.size() + " messages "); + @Override + public int sendSync(Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(message); } return messages.size(); - } + } - @Override - public int sendSync(String routingParam, String message) throws Exception { - //Can only route if exchange is setup - if(queueName == null) { - channel.basicPublish(exchangeName, routingParam, null, message.getBytes()); - return 1; - } - else - throw new UnsupportedOperationException("Routing without exchange"); - - } + @Override + public int sendSync(String routingParam, String message) throws Exception { + channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes()); + return 1; + } - @Override - public int sendSync(String routingParam, Collection<String> messages) throws Exception { - log.debug("Publishing" + messages.size() + " messages "); - //Can only route if exchange is setup - if(queueName == null) { - for (String message : messages) { - sendSync(routingParam, message); - } - return messages.size(); - } - else - throw new UnsupportedOperationException("Routing without exchange"); - } + @Override + public int sendSync(String routingParam, Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); + for (String message : messages) { + sendSync(routingParam, message); + } + return messages.size(); + } } diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqUtils.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqUtils.java new file mode 100644 index 0000000..0c5177a --- /dev/null +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqUtils.java @@ -0,0 +1,127 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.event.client; + +import java.io.FileInputStream; +import java.security.KeyStore; +import java.util.ArrayList; +import java.util.List; + +import javax.naming.ConfigurationException; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultSaslConfig; + +public class RabbitMqUtils { + private static ConnectionFactory factory = new ConnectionFactory(); + + /** + * (intended for testing purpose only) + * @param connFactory + */ + static void setConnectionFactory(ConnectionFactory connFactory) { + factory = connFactory; + } + + public static Connection createConnection(RabbitMqClientConfig config) throws Exception { + if (config.getHosts() == null) { + throw new ConfigurationException("Mandatory config param hosts not set"); + } + + List<Address> addresses = new ArrayList<Address>(); + String[] hosts = config.getHosts().split(","); + for (String host : hosts) { + String[] parts = host.split(":"); + if (parts.length != 2) { + throw new ConfigurationException("Hosts must be specified in 'host:port' format"); + } + + int port = Integer.parseInt(parts[1]); + Address add = new Address(parts[0], port); + addresses.add(add); + } + + if (config.getUsername() == null && !config.getEnableSsl()) { + throw new ConfigurationException("Mandatory config param username not set"); + } + + factory.setUsername(config.getUsername()); + + if (config.getPassword() == null && !config.getEnableSsl()) { + throw new ConfigurationException("Mandatory config param password not set"); + } + + factory.setPassword(config.getPassword()); + + if (config.getVirtualHost() != null) { + factory.setVirtualHost(config.getVirtualHost()); + } + + checkSSL(factory,config); + factory.setConnectionTimeout(120000); + if (config.getConnectionTimeout() != null) { + try { + int timeout = Integer.parseInt(config.getConnectionTimeout()); + factory.setConnectionTimeout(timeout); + } catch (NumberFormatException ex) { + } + } + + return factory.newConnection(addresses); + } + + private static void checkSSL(ConnectionFactory factory, RabbitMqClientConfig config) throws Exception { + + // Check if any of SSL params is configured + if (config.getEnableSsl()) { + if (config.getSslKeyStoreFile() == null || config.getSslKeyStorePassword() == null + || config.getSslTrustStoreFile() == null || config.getSslTrustStorePassword() == null) { + throw new ConfigurationException( + "Missing SSL configuration : sslKeyStoreFile , sslKeyStorePassword , sslTrustStorePassword or sslTrustStoreFile"); + } + char[] keyPassphrase = config.getSslKeyStorePassword().toCharArray(); + KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(new FileInputStream(config.getSslKeyStoreFile()), keyPassphrase); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, keyPassphrase); + + char[] trustPassphrase = config.getSslTrustStorePassword().toCharArray(); + KeyStore tks = KeyStore.getInstance("JKS"); + tks.load(new FileInputStream(config.getSslTrustStoreFile()), trustPassphrase); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(tks); + SSLContext c = SSLContext.getInstance("TLSv1.2"); + c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); + factory.useSslProtocol(c); + } + + } + +} diff --git a/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java index df4c54b..79f9914 100644 --- a/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java +++ b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqConsumer.java @@ -65,10 +65,33 @@ public class TestRabbitMqConsumer { BuiltinExchangeType topicEx = BuiltinExchangeType.TOPIC; TimeUnit unit = TimeUnit.MILLISECONDS; Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); + + RabbitMqClientConfig config1 = new RabbitMqClientConfig(); + RabbitMqClientConfig config2 = new RabbitMqClientConfig(); + RabbitMqClientConfig config3 = new RabbitMqClientConfig(); + @Before public void init() throws Exception { - RabbitMqEventConsumer.setConnectionFactory(mockConnectionFactory); + RabbitMqUtils.setConnectionFactory(mockConnectionFactory); + + config1.setHosts("host1:1234"); + config1.setUsername("user"); + config1.setPassword("secret"); + config1.setExchangeName("my-exchange"); + config1.setQueue("my-queue"); + + config2.setHosts("host1:1234"); + config2.setUsername("user"); + config2.setPassword("secret"); + config2.setExchangeName("my-exchange"); + config2.setExchangeType(BuiltinExchangeType.DIRECT.name()); + config2.setQueue("my-queue"); + + config3.setHosts("host1:1234,host2:5678"); + config3.setUsername("user"); + config3.setPassword("secret"); + config3.setQueue("my-queue"); } @Test @@ -77,11 +100,9 @@ public class TestRabbitMqConsumer { Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); - new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); - new RabbitMqEventConsumer("hosts", 0, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName", "queueName"); - new RabbitMqEventConsumer(hostPortMap, "userName", "password", "exchangeName", "queueName"); - new RabbitMqEventConsumer(hostPortMap, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName", "queueName"); - new RabbitMqEventConsumer("hosts", 0, "userName", "password", "queue"); + new RabbitMqEventConsumer(config1); + new RabbitMqEventConsumer(config2); + new RabbitMqEventConsumer(config3); } @Test @@ -90,7 +111,7 @@ public class TestRabbitMqConsumer { Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); - RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer(config1); consumer.consume(); consumer.consumeWithOffsets(); consumer.consumeAndCommit(); @@ -103,7 +124,7 @@ public class TestRabbitMqConsumer { Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); Mockito.when(mockChannel.basicConsume(Mockito.any(), Mockito.anyBoolean(), Mockito.any(Consumer.class))).thenReturn(Mockito.anyString()); - RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer(config1); consumer.commitOffsets(); consumer.commitOffsets(0L); consumer.close(); @@ -118,7 +139,7 @@ public class TestRabbitMqConsumer { List<MessageWithOffset> records = buildTestMessages(2); mqueue = new ArrayBlockingQueue<>(2); mqueue.addAll(records); - RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer(config1); consumer.setMessageQueue(mqueue); consumer.consumeAndCommit(); consumer.close(); @@ -133,7 +154,7 @@ public class TestRabbitMqConsumer { List<MessageWithOffset> records = buildTestMessages(2); mqueue = new ArrayBlockingQueue<>(2); mqueue.addAll(records); - RabbitMqEventConsumer consumer = new RabbitMqEventConsumer("hosts", 0, "userName", "password", "exchangeName", "queueName"); + RabbitMqEventConsumer consumer = new RabbitMqEventConsumer(config1); consumer.setMessageQueue(mqueue); consumer.consumeWithOffsets(); consumer.commitOffsets(); diff --git a/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqPublisher.java b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqPublisher.java index e41126f..54384ed 100644 --- a/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqPublisher.java +++ b/event-client-rabbitmq/src/test/java/org/onap/aai/event/client/TestRabbitMqPublisher.java @@ -55,69 +55,59 @@ public class TestRabbitMqPublisher { BuiltinExchangeType topicEx = BuiltinExchangeType.TOPIC; Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); + RabbitMqClientConfig config1 = new RabbitMqClientConfig(); + RabbitMqClientConfig config2 = new RabbitMqClientConfig(); + RabbitMqClientConfig config3 = new RabbitMqClientConfig(); @Before public void init() throws Exception { - RabbitMqEventPublisher.setConnectionFactory(mockConnectionFactory); + RabbitMqUtils.setConnectionFactory(mockConnectionFactory); + + config1.setHosts("host1:1234"); + config1.setUsername("user"); + config1.setPassword("secret"); + config1.setExchangeName("my-exchange"); + + config2.setHosts("host1:1234"); + config2.setUsername("user"); + config2.setPassword("secret"); + config2.setExchangeName("my-exchange"); + config2.setExchangeType(BuiltinExchangeType.DIRECT.name()); } @Test public void testConstructors() throws Exception { - Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); - Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); - Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); - new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); - new RabbitMqEventPublisher("hosts", 0, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName"); - new RabbitMqEventPublisher(hostPortMap, "userName", "password", "exchangeName"); - new RabbitMqEventPublisher(hostPortMap, "userName", "password", BuiltinExchangeType.DIRECT.name(), "exchangeName"); - new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap<String, Object>()); + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + new RabbitMqEventPublisher(config1); + new RabbitMqEventPublisher(config2); } - + @Test public void publishSynchronous() throws Exception { - Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); - Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); - Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); - RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); - publisher.sendSync(""); - publisher.sendSync(Arrays.asList("")); - publisher.sendSync("key", ""); + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher(config1); + publisher.sendSync(""); + publisher.sendSync(Arrays.asList("")); + publisher.sendSync("key", ""); publisher.sendSync("key", Arrays.asList("")); publisher.close(); } @Test public void publishAsynchronous() throws Exception { - Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); - Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); - Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); - RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "exchangeName"); - publisher.sendAsync(""); - publisher.sendAsync(Arrays.asList("")); - publisher.sendAsync("key", ""); - publisher.sendAsync("key", Arrays.asList("")); - publisher.close(); - } - @Test(expected = UnsupportedOperationException.class) - public void UnsupportedMessageTest() throws Exception { - Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); - Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); - Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); - RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap<String, Object>()); - publisher.sendSync("key", ""); - publisher.sendSync("key", Arrays.asList("")); - publisher.close(); - } - - @Test(expected = UnsupportedOperationException.class) - public void UnSupportedMultiMessageTests() throws Exception { - Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); - Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); - Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); - RabbitMqEventPublisher publisher = new RabbitMqEventPublisher("hosts", 0, "userName", "password", "queueName",new HashMap<String, Object>()); - publisher.sendSync("key", Arrays.asList("")); - publisher.close(); + Mockito.when(mockConnectionFactory.newConnection(Mockito.anyListOf(Address.class))).thenReturn(mockConnection); + Mockito.when(mockConnection.createChannel()).thenReturn(mockChannel); + Mockito.when(mockChannel.exchangeDeclare(Mockito.any(), Mockito.eq(topicEx), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyMap())).thenReturn(mockDeclareOK); + RabbitMqEventPublisher publisher = new RabbitMqEventPublisher(config1); + publisher.sendAsync(""); + publisher.sendAsync(Arrays.asList("")); + publisher.sendAsync("key", ""); + publisher.sendAsync("key", Arrays.asList("")); + publisher.close(); } - } |