diff options
author | Gurjeet Bedi <gurjeetb@amdocs.com> | 2018-06-05 13:13:54 -0400 |
---|---|---|
committer | Gurjeet Bedi <gurjeetb@amdocs.com> | 2018-06-07 10:02:28 -0400 |
commit | a19b69d1809955293390c8abd868d0eaff318e5b (patch) | |
tree | 78376699ef24570dc8413075ecf5842127e55c2a /event-client-rabbitmq/src/main | |
parent | 1101277e3bf85ffae75b5a44c9149dd95b30c9b6 (diff) |
Event Client library support for RabbitMQ
Update version in RabbitMQ Event Client lib
License header update
Issue-ID: AAI-1205
Change-Id: I838c184a0b81d89d5d276fbeee373cf5c390fb22
Signed-off-by: Gurjeet Bedi <gurjeetb@amdocs.com>
Diffstat (limited to 'event-client-rabbitmq/src/main')
2 files changed, 593 insertions, 0 deletions
diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java new file mode 100644 index 0000000..c564198 --- /dev/null +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventConsumer.java @@ -0,0 +1,312 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.event.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.MessageWithOffset; + +import com.rabbitmq.client.Address; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +/** + * Event bus client consumer wrapper for RabbitMQ. + * This will open a async consumer which will put messages on internal queue so that calling client can then + * consume and ackowledge the same based on offset/deliveryTag. + * Topic exchange type is powerful and can behave like other exchanges. + * When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange. + * When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one + * @author GURJEETB + * + */ +public class RabbitMqEventConsumer implements EventConsumer { + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventConsumer.class); + private static ConnectionFactory factory = new ConnectionFactory(); + private BlockingQueue<MessageWithOffset> messageQueue; + + private static String BINDING_CONSUME_ALL = "#"; + + private final Connection connection; + private final Channel channel; + private final String queueName; + private DeclareOk exchangeInfo; + private Long lastDeliveryTag; + private long timeout = 5000; + private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + + /** + * (intended for testing prupose only) + * @param connFactory + */ + static void setConnectionFactory(ConnectionFactory connFactory) { + factory = connFactory; + } + + /** + * (intended for testing prupose only) + * @param messageQueue + */ + public void setMessageQueue(BlockingQueue<MessageWithOffset> messageQueue) { + this.messageQueue = messageQueue; + } + + /** + * Constructor to open a consumer on single host port for a topic exchange with specific queue name which will + * consume all messages from topic + * @param host + * @param port + * @param userName + * @param password + * @param exchangeName + * @param queueName + * @throws Exception + */ + public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeName, String queueName) throws Exception { + this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); + } + + /** + * Constructor to open a consumer on single host port for a exchange with specific queue name which will + * consume all messages from topic + * @param host + * @param port + * @param userName + * @param password + * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS + * @param exchangeName + * @param queueName + * @throws Exception + */ + public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { + this(host, port, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); + } + + /** + * Constructor to open a consumer on single host port for a exchange with specific queue name + * @param host + * @param port + * @param userName + * @param password + * @param exchangeType + * @param exchangeName + * @param queueName + * @param bindingKey - Bind the queue to specific messages only + * @throws Exception + */ + public RabbitMqEventConsumer(String host, int port, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { + this(buildMap(host, port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); + } + + /** + * Constructor to open a consumer on multiple host port for a topic exchange with specific queue name which will + * consume all messages from topic + * @param hostPortMap + * @param userName + * @param password + * @param exchangeName + * @param queueName + * @throws Exception + */ + public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName, String queueName) throws Exception { + this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName, queueName, BINDING_CONSUME_ALL); + } + + /** + * Constructor to open a consumer on multiple host port for a exchange with specific queue name which will + * consume all messages from topic + * @param hostPortMap + * @param userName + * @param password + * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS + * @param exchangeName + * @param queueName + * @throws Exception + */ + public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName) throws Exception { + this(hostPortMap, userName, password, exchangeType, exchangeName, queueName, BINDING_CONSUME_ALL); + } + + /** + * Constructor to open a consumer on multiple host port for a exchange with specific queue name + * @param hostPortMap + * @param userName + * @param password + * @param exchangeType + * @param exchangeName + * @param queueName + * @param bindingKey - Bind the queue to specific messages only + * @throws Exception + */ + public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String queueName, String bindingKey) throws Exception { + this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName, queueName, bindingKey, new HashMap<String, Object>()); + } + + public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String queueName, String bindingKey, Map<String, Object> exchangeArguments) throws Exception { + messageQueue = new ArrayBlockingQueue<>(1000); + List<Address> addresses = new ArrayList<Address>(); + Iterator<String> iter = hostPortMap.keySet().iterator(); + while (iter.hasNext()) + { + String host = iter.next(); + int port = hostPortMap.get(host); + Address add = new Address(host,port); + addresses.add(add); + } + factory.setUsername(userName); + factory.setPassword(password); + connection = factory.newConnection(addresses); + channel = connection.createChannel(); + exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); + this.queueName = queueName; + channel.queueDeclare(queueName, true, false, false, null); + channel.queueBind(queueName, exchangeName, bindingKey); + String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false + } + + public RabbitMqEventConsumer(String host, int port, String userName, String password, String queue) throws Exception { + this(buildMap(host, port), userName, password, queue, new HashMap<String, Object>()); + } + + public RabbitMqEventConsumer(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { + messageQueue = new ArrayBlockingQueue<>(1000); + List<Address> addresses = new ArrayList<Address>(); + Iterator<String> iter = hostPortMap.keySet().iterator(); + while (iter.hasNext()) + { + String host = iter.next(); + int port = hostPortMap.get(host); + Address add = new Address(host,port); + addresses.add(add); + } + factory.setUsername(userName); + factory.setPassword(password); + connection = factory.newConnection(addresses); + channel = connection.createChannel(); + this.queueName = queue; + queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); + String consumerTag = channel.basicConsume(queueName, false, new CallBackConsumer(channel)); //AutoAck is false + } + + private static Map<String, Integer> buildMap(String host, Integer port) { + Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); + hostPortMap.put(host, port); + return hostPortMap; + } + + @Override + public Iterable<String> consumeAndCommit() throws Exception { + Iterable<String> list = consume(); + commitOffsets(); + return list; + } + + @Override + public Iterable<String> consume() throws Exception { + List<String> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record.getMessage()); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public Iterable<MessageWithOffset> consumeWithOffsets() throws Exception { + List<MessageWithOffset> list = new ArrayList<>(); + MessageWithOffset record = null; + if(messageQueue.peek()!=null) { + do + { + record = messageQueue.poll(1000, TimeUnit.MILLISECONDS); + lastDeliveryTag = record.getOffset(); + list.add(record); + }while(messageQueue.peek()!=null); + } + return list; + } + + @Override + public void commitOffsets() throws Exception { + if(lastDeliveryTag != null) + { + channel.basicAck(lastDeliveryTag, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + lastDeliveryTag = null; + } + } + + @Override + public void commitOffsets(long offset) throws Exception { + channel.basicAck(offset, true); //Ack messages upto lastDeliveryTag or offset so that they can be marked + } + + /** + * Closes the channel + * @throws Exception + */ + public void close() throws Exception { + channel.close(); + connection.close(); + } + + class CallBackConsumer extends DefaultConsumer{ + CallBackConsumer(Channel channel) { + super(channel); + } + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { + String message = new String(body, "UTF-8"); + try + { + MessageWithOffset record = new MessageWithOffset(envelope.getDeliveryTag(), message); + messageQueue.offer(record, timeout, TimeUnit.MILLISECONDS); + } + catch(Exception e) + { + log.debug(" Got exception while handling message="+e.getMessage()+" Will be reposting to queue"); + channel.basicNack(envelope.getDeliveryTag(), false, true); //Explicit Ack with requeue + } + } + } +} diff --git a/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java new file mode 100644 index 0000000..a7e0ee3 --- /dev/null +++ b/event-client-rabbitmq/src/main/java/org/onap/aai/event/client/RabbitMqEventPublisher.java @@ -0,0 +1,281 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.event.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventPublisher; + +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Event bus client publisher wrapper for RabbitMQ + * @author GURJEETB + * + */ + +public class RabbitMqEventPublisher implements EventPublisher { + private static Logger log = LoggerFactory.getInstance().getLogger(RabbitMqEventPublisher.class); + private static ConnectionFactory factory = new ConnectionFactory(); + + private final Connection connection; + private final Channel channel; + private final String exchangeName; + private final String queueName; + private final String routingKey; + private DeclareOk exchangeInfo; + private com.rabbitmq.client.AMQP.Queue.DeclareOk queueInfo; + + static void setConnectionFactory(ConnectionFactory connFactory) { + factory = connFactory; + } + /** + * Constructor for single host port for a topic exchange + * @param host + * @param port + * @param userName + * @param password + * @param exchangeName + * @throws Exception + */ + public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeName) throws Exception { + this(host, port, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); + } + + /** + * Constructor for single host port for a specific exchange type + * @param host + * @param port + * @param userName + * @param password + * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS + * @param exchangeName + * @throws Exception + */ + public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName) throws Exception { + this(host,port, userName, password, exchangeType, exchangeName,""); + } + + /** + * Constructor for single host port for a specific exchange type and specific routing key + * @param host + * @param port + * @param userName + * @param password + * @param exchangeType + * @param exchangeName + * @param routingKey + * @throws Exception + */ + public RabbitMqEventPublisher(String host, int port, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { + this(buildMap(host,port), userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); + } + + /** + * Constructor for multiple host and port for a topic exchange + * @param hostPortMap + * @param userName + * @param password + * @param exchangeName + * @throws Exception + */ + public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeName) throws Exception { + this(hostPortMap, userName, password, BuiltinExchangeType.TOPIC.name(), exchangeName,""); + } + + /** + * Constructor for multiple host port for a specific exchange type + * @param hostPortMap + * @param userName + * @param password + * @param exchangeType - Supported values - DIRECT, FANOUT, TOPIC, HEADERS + * @param exchangeName + * @throws Exception + */ + public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName) throws Exception { + this(hostPortMap, userName, password, exchangeType , exchangeName,""); + } + + /** + * Constructor for multiple host port for a specific exchange type and specific routing key + * @param hostPortMap + * @param userName + * @param password + * @param exchangeType + * @param exchangeName + * @param routingKey + * @throws Exception + */ + public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String exchangeType, String exchangeName, String routingKey) throws Exception { + this(hostPortMap, userName, password, BuiltinExchangeType.valueOf(exchangeType), exchangeName,routingKey, new HashMap<String, Object>()); + } + + public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, BuiltinExchangeType exchangeType, String exchangeName, String routingKey, Map<String, Object> exchangeArguments) throws Exception { + List<Address> addresses = new ArrayList<Address>(); + Iterator<String> iter = hostPortMap.keySet().iterator(); + while (iter.hasNext()) + { + String host = iter.next(); + int port = hostPortMap.get(host); + Address add = new Address(host,port); + addresses.add(add); + } + factory.setUsername(userName); + factory.setPassword(password); + connection = factory.newConnection(addresses); + channel = connection.createChannel(); + this.exchangeName = exchangeName; + exchangeInfo = channel.exchangeDeclare(exchangeName, exchangeType, true, false, exchangeArguments); //Durable exchange and non delete + queueName = null; + this.routingKey=routingKey; + } + + /** + * Constructor for single host port for a queue connection + * @param host + * @param port + * @param userName + * @param password + * @param queue + * @param queueArguments + * @throws Exception + */ + public RabbitMqEventPublisher(String host, int port, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { + this(buildMap(host, port), userName, password, queue, queueArguments); + } + + /** + * Constructor for multiple host port for a queue connection + * @param hostPortMap + * @param userName + * @param password + * @param queue + * @param queueArguments + * @throws Exception + */ + public RabbitMqEventPublisher(Map<String, Integer> hostPortMap, String userName, String password, String queue, Map<String, Object> queueArguments) throws Exception { + List<Address> addresses = new ArrayList<Address>(); + Iterator<String> iter = hostPortMap.keySet().iterator(); + while (iter.hasNext()) + { + String host = iter.next(); + int port = hostPortMap.get(host); + Address add = new Address(host,port); + addresses.add(add); + } + factory.setUsername(userName); + factory.setPassword(password); + connection = factory.newConnection(addresses); + channel = connection.createChannel(); + exchangeName = ""; + routingKey=queue; + this.queueName = queue; + queueInfo = channel.queueDeclare(queueName, true, false, false, queueArguments); //Durable, non exclusive and non auto delete queue + } + + private static Map<String, Integer> buildMap(String host, Integer port) { + Map<String, Integer> hostPortMap = new HashMap<String, Integer>(); + hostPortMap.put(host, port); + return hostPortMap; + } + + @Override + public void close() throws Exception { + channel.close(); + connection.close(); + } + + @Override + public void sendAsync(String message) throws Exception { + sendSync(message); + } + + @Override + public void sendAsync(Collection<String> messages) throws Exception { + sendSync(messages); + } + + @Override + public void sendAsync(String routingParam, String message) throws Exception { + sendSync(routingParam, message); + } + + @Override + public void sendAsync(String routingParam, Collection<String> messages) throws Exception { + sendSync(routingParam, messages); + } + + @Override + public int sendSync(String message) throws Exception { + channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); + log.debug(" [x] Sent '" + message + "'"); + return 1; + } + + @Override + public int sendSync(Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); + for (String message : messages) { + sendSync(message); + } + return messages.size(); + } + + @Override + public int sendSync(String routingParam, String message) throws Exception { + //Can only route if exchange is setup + if(queueName == null) { + channel.basicPublish(exchangeName, routingParam, null, message.getBytes()); + return 1; + } + else + throw new UnsupportedOperationException("Routing without exchange"); + + } + + @Override + public int sendSync(String routingParam, Collection<String> messages) throws Exception { + log.debug("Publishing" + messages.size() + " messages "); + //Can only route if exchange is setup + if(queueName == null) { + for (String message : messages) { + sendSync(routingParam, message); + } + return messages.size(); + } + else + throw new UnsupportedOperationException("Routing without exchange"); + } + +} |