From b464eec16a30627bf5237b71ba0088d458640c22 Mon Sep 17 00:00:00 2001 From: FengLiang Date: Mon, 6 Mar 2017 15:04:36 +0800 Subject: Modify MQ listener mode Change-Id: Ided322b55ed1baff351edb03239fc06e32da3844 Issue-ID: HOLMES-47 Signed-off-by: FengLiang --- .../openo/holmes/engine/manager/DroolsEngine.java | 78 ++++++++++++++-------- 1 file changed, 49 insertions(+), 29 deletions(-) (limited to 'engine-d/src/main/java') diff --git a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java index 0ed452a..d9f55ed 100644 --- a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java @@ -16,6 +16,7 @@ package org.openo.holmes.engine.manager; +import java.io.Serializable; import java.io.StringReader; import java.util.List; import java.util.Locale; @@ -25,11 +26,13 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; +import javax.jms.MessageListener; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQObjectMessage; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; @@ -57,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper; @Slf4j @Service public class DroolsEngine { - - private final static String CORRELATION_RULE = "CORRELATION_RULE"; - - private final static String CORRELATION_ALARM = "CORRELATION_ALARM"; - private final static int ENABLE = 1; @Inject @@ -99,8 +97,8 @@ public class DroolsEngine { connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, mqConfigProvider.get().brokerPassword, brokerURL); - Thread thread = new Thread(new AlarmMqMessageListener()); - thread.start(); + AlarmMqMessageListener listener = new AlarmMqMessageListener(); + listener.receive(); } @@ -258,34 +256,56 @@ public class DroolsEngine { this.ksession.fireAllRules(); } - class AlarmMqMessageListener implements Runnable { + class AlarmMqMessageListener implements MessageListener { + + private Connection connection = null; + private Session session = null; + private Destination destination = null; + private MessageConsumer consumer = null; - public void run() { - Connection connection; - Session session; - Destination destination; - MessageConsumer messageConsumer; + private void initialize() throws JMSException { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); + consumer = session.createConsumer(destination); + connection.start(); + } + + public void receive() { + try { + initialize(); + consumer.setMessageListener(this); + } catch (JMSException e) { + log.error("Failed to connect to the MQ service : " + e.getMessage(), e); + try { + close(); + } catch (JMSException e1) { + log.error("Failed close connection " + e1.getMessage(), e1); + } + } + } + public void onMessage(Message arg0) { + ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; try { - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); - messageConsumer = session.createConsumer(destination); - - while (true) { - ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000); - if (objMessage != null) { - putRaisedIntoStream((Alarm) objMessage.getObject()); - } else { - break; - } + Serializable object = objectMessage.getObject(); + + if (object instanceof Alarm) { + Alarm alarm = (Alarm) object; + putRaisedIntoStream(alarm); } } catch (JMSException e) { - log.error("connection mq service Failed: " + e.getMessage(), e); + log.error("Failed get object : " + e.getMessage(), e); } + } + private void close() throws JMSException { + if (consumer != null) + consumer.close(); + if (session != null) + session.close(); + if (connection != null) + connection.close(); } } - } -- cgit 1.2.3-korg