diff options
Diffstat (limited to 'event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java')
-rw-r--r-- | event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java | 287 |
1 files changed, 63 insertions, 224 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java index a7e0ee3..614ae54 100644 --- a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java @@ -21,23 +21,16 @@ package org.onap.aai.event.client; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; + +import javax.naming.ConfigurationException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.event.api.EventPublisher; -import com.rabbitmq.client.AMQP.Exchange.DeclareOk; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; /** * Event bus client publisher wrapper for RabbitMQ @@ -46,236 +39,82 @@ import com.rabbitmq.client.ConnectionFactory; */ public class RabbitMqEventPublisher implements EventPublisher { - private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private static ConnectionFactory factory = new ConnectionFactory(); + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); - private final Connection connection; - private final Channel channel; - private final String exchangeName; - private final String queueName; - private final String routingKey; - private DeclareOk exchangeInfo; - private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + private final Connection connection; + private final Channel channel; - static void setConnectionFactory(ConnectionFactory connFactory) { - factory = connFactory; - } - /** - * Constructor for single host port for a topic exchange - * @param host - * @param port - * @param userName - * @param password - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeName) throws Exception { - this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); - } - - /** - * Constructor for single host port for a specific exchange type - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName) throws Exception { - this(host,port, userName, password, exchangeType, exchangeName,""); - } - - /** - * Constructor for single host port for a specific exchange type and specific routing key - * @param host - * @param port - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param routingKey - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { - this(buildMap(host,port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); - } - - /** - * Constructor for multiple host and port for a topic exchange - * @param hostPortMap - * @param userName - * @param password - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); - } - - /** - * Constructor for multiple host port for a specific exchange type - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS - * @param exchangeName - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName) throws Exception { - this(hostPortMap, userName, password, exchangeType , exchangeName,""); - } - - /** - * Constructor for multiple host port for a specific exchange type and specific routing key - * @param hostPortMap - * @param userName - * @param password - * @param exchangeType - * @param exchangeName - * @param routingKey - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { - this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); - } + private RabbitMqClientConfig config; + + public RabbitMqEventPublisher(RabbitMqClientConfig config) throws Exception { + this.config = config; + + if (config.getExchangeName() == null) { + throw new ConfigurationException("Mandatory config param exchangeName not set"); + } + + this.connection = RabbitMqUtils.createConnection(config); + this.channel = connection.createChannel(); - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String routingKey, Map<String, Object> exchangeArguments) throws Exception { - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - this.exchangeName = exchangeName; - exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); //Durable exchange and non delete - queueName = null; - this.routingKey=routingKey; - } - - /** - * Constructor for single host port for a queue connection - * @param host - * @param port - * @param userName - * @param password - * @param queue - * @param queueArguments - * @throws Exception - */ - public RabbitMqEventPublisher(String host, int port, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - this(buildMap(host, port), userName, password, queue, queueArguments); - } - - /** - * Constructor for multiple host port for a queue connection - * @param hostPortMap - * @param userName - * @param password - * @param queue - * @param queueArguments - * @throws Exception - */ - public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { - List<Address> addresses = new ArrayList<Address>(); - Iterator<String> iter = hostPortMap.keySet().iterator(); - while (iter.hasNext()) - { - String host = iter.next(); - int port = hostPortMap.get(host); - Address add = new Address(host,port); - addresses.add(add); - } - factory.setUsername(userName); - factory.setPassword(password); - connection = factory.newConnection(addresses); - channel = connection.createChannel(); - exchangeName = ""; - routingKey=queue; - this.queueName = queue; - queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); //Durable, non exclusive and non auto delete queue - } - - private static Map<String, Integer> buildMap(String host, Integer port) { - Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); - hostPortMap.put(host, port); - return hostPortMap; - } + //Durable exchange and non delete + channel.exchangeDeclare(config.getExchangeName(), config.getExchangeType(), true, false, config.getExchangeArguments()); + } - @Override - public void close() throws Exception { - channel.close(); - connection.close(); - } + @Override + public void close() throws Exception { + channel.close(); + connection.close(); + } - @Override - public void sendAsync(String message) throws Exception { - sendSync(message); - } + @Override + public void sendAsync(String message) throws Exception { + sendSync(message); + } - @Override - public void sendAsync(Collection<String> messages) throws Exception { - sendSync(messages); - } + @Override + public void sendAsync(Collection<String> messages) throws Exception { + sendSync(messages); + } - @Override - public void sendAsync(String routingParam, String message) throws Exception { - sendSync(routingParam, message); - } + @Override + public void sendAsync(String routingParam, String message) throws Exception { + sendSync(routingParam, message); + } - @Override - public void sendAsync(String routingParam, Collection<String> messages) throws Exception { - sendSync(routingParam, messages); - } + @Override + public void sendAsync(String routingParam, Collection<String> messages) throws Exception { + sendSync(routingParam, messages); + } - @Override - public int sendSync(String message) throws Exception { - channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); - log.debug(" [x] Sent '" + message + "'"); - return 1; - } + @Override + public int sendSync(String message) throws Exception { + channel.basicPublish(config.getExchangeName(), config.getRoutingKey(), null, message.getBytes()); + log.debug(" [x] Sent '" + message + "'"); + return 1; + } - @Override - public int sendSync(Collection<String> messages) throws Exception { - log.debug("Publishing" + messages.size() + " messages "); + @Override + public int sendSync(Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); for (String message : messages) { sendSync(message); } return messages.size(); - } + } - @Override - public int sendSync(String routingParam, String message) throws Exception { - //Can only route if exchange is setup - if(queueName == null) { - channel.basicPublish(exchangeName, routingParam, null, message.getBytes()); - return 1; - } - else - throw new UnsupportedOperationException("Routing without exchange"); - - } + @Override + public int sendSync(String routingParam, String message) throws Exception { + channel.basicPublish(config.getExchangeName(), routingParam, null, message.getBytes()); + return 1; + } - @Override - public int sendSync(String routingParam, Collection<String> messages) throws Exception { - log.debug("Publishing" + messages.size() + " messages "); - //Can only route if exchange is setup - if(queueName == null) { - for (String message : messages) { - sendSync(routingParam, message); - } - return messages.size(); - } - else - throw new UnsupportedOperationException("Routing without exchange"); - } + @Override + public int sendSync(String routingParam, Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); + for (String message : messages) { + sendSync(routingParam, message); + } + return messages.size(); + } } |