diff options
author | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-09-12 19:06:49 +0800 |
---|---|---|
committer | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-09-12 19:06:49 +0800 |
commit | 9205cee30a28456ca29918a9c2036e51a2feb301 (patch) | |
tree | b771afaaef20a53980831245ea03e710016daf84 /engine-d | |
parent | 3c2a7c4614ed0b925a7ac9d0c2ff4907c89e6b51 (diff) |
add MQ consumer
Issue-ID: HOLMES-47
Change-Id: I16109e668ca20819af2e53de906387ec12907e7b
Signed-off-by: Shiwei Tian <tian.shiwei@zte.com.cn>
Diffstat (limited to 'engine-d')
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java | 114 | ||||
-rw-r--r-- | engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java | 189 |
2 files changed, 303 insertions, 0 deletions
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java new file mode 100644 index 0000000..bd77312 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java @@ -0,0 +1,114 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.mqconsumer; + +import java.io.Serializable; +import javax.inject.Inject; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.glassfish.hk2.api.IterableProvider; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.config.MQConfig; +import org.onap.holmes.common.constant.AlarmConst; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +@Slf4j +@NoArgsConstructor +public class MQConsumer { + + @Inject + private IterableProvider<MQConfig> mqConfigProvider; + private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory1; + @Inject + private DroolsEngine engine; + + public void registerAlarmTopicListener() { + String brokerURL = + "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; + connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, + mqConfigProvider.get().brokerPassword, brokerURL); + + AlarmMqMessageListener listener = new AlarmMqMessageListener(); + listener.receive(); + } + class AlarmMqMessageListener implements MessageListener { + + private Connection connection = null; + private Session session = null; + private Destination destination = null; + private MessageConsumer consumer = null; + + private void initialize() throws JMSException { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); + consumer = session.createConsumer(destination); + connection.start(); + } + + public void receive() { + try { + initialize(); + consumer.setMessageListener(this); + } catch (JMSException e) { + log.error("Failed to connect to the MQ service : " + e.getMessage(), e); + try { + close(); + } catch (JMSException e1) { + log.error("Failed close connection " + e1.getMessage(), e1); + } + } + } + + public void onMessage(Message arg0) { + ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; + try { + Serializable object = objectMessage.getObject(); + if (object instanceof VesAlarm) { + VesAlarm vesAlarm = (VesAlarm) object; + engine.putRaisedIntoStream(vesAlarm); + } + } catch (JMSException e) { + log.error("Failed get object : " + e.getMessage(), e); + } + } + + private void close() throws JMSException { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java new file mode 100644 index 0000000..b1ea3cb --- /dev/null +++ b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java @@ -0,0 +1,189 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.engine.mqconsumer; + +import static org.easymock.EasyMock.anyBoolean; +import static org.easymock.EasyMock.anyInt; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; + + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.easymock.EasyMock; +import org.glassfish.hk2.api.IterableProvider; +import org.junit.Before; +import org.junit.Test; +import org.onap.holmes.common.api.stat.Alarm; +import org.onap.holmes.common.config.MQConfig; +import org.powermock.api.easymock.PowerMock; +import org.powermock.reflect.Whitebox; + +public class MQConsumerTest { + + private IterableProvider<MQConfig> mqConfigProvider; + + private ConnectionFactory connectionFactory; + + private MQConsumer mqConsumer; + + private MQConsumer mqConsumer1; + + private MQConsumer mqConsumer2; + + @Before + public void setUp() { + + mqConsumer = new MQConsumer(); + + mqConfigProvider = PowerMock.createMock(IterableProvider.class); + connectionFactory = PowerMock.createMock(ConnectionFactory.class); + + Whitebox.setInternalState(mqConsumer, "mqConfigProvider", mqConfigProvider); + Whitebox.setInternalState(mqConsumer, "connectionFactory", connectionFactory); + } + + @Test + public void init() throws Exception { + MQConfig mqConfig = new MQConfig(); + mqConfig.brokerIp = "127.0.0.1"; + mqConfig.brokerPort = 4567; + mqConfig.brokerUsername = "admin"; + mqConfig.brokerPassword = "admin"; + + expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes(); + PowerMock.replayAll(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_receive() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_exception() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + consumer.close(); + session.close(); + connection.close(); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_close_exception() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + + Connection connection = PowerMock.createMock(Connection.class); + Session session = PowerMock.createMock(Session.class); + Destination destination = PowerMock.createMock(Topic.class); + MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class); + + Whitebox.setInternalState(listener, "connection", connection); + Whitebox.setInternalState(listener, "session", session); + Whitebox.setInternalState(listener, "destination", destination); + Whitebox.setInternalState(listener, "consumer", consumer); + + PowerMock.reset(); + + expect(connectionFactory.createConnection()).andReturn(connection); + connection.start(); + expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session); + expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination); + expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer); + consumer.setMessageListener(listener); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + consumer.close(); + EasyMock.expectLastCall().andThrow(new JMSException("")); + + PowerMock.replayAll(); + + listener.receive(); + + PowerMock.verifyAll(); + } + + @Test + public void listener_on_message() throws JMSException { + MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener(); + Alarm alarm = new Alarm(); + alarm.setAlarmKey("alarmKey"); + ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage(); + objectMessage.setObject(alarm); + + listener.onMessage(objectMessage); + } +} |