diff options
author | FengLiang <feng.liang1@zte.com.cn> | 2017-02-20 18:05:32 +0800 |
---|---|---|
committer | FengLiang <feng.liang1@zte.com.cn> | 2017-02-21 09:47:33 +0800 |
commit | f7907354daf58f6a729556e21ba01cae777de0be (patch) | |
tree | 5aacddc1ef5c00c227a4d09af885605c53eccd3a /engine-d/src/main/java | |
parent | a4f06eee9b2cf0bea405a32bfa39ae2733c292e4 (diff) |
Modify engine mq
mq functional testing
Change-Id: Ide2891590cd814e0787684edd7384554402532cb
Issue-ID:HOLMES-19
Signed-off-by: FengLiang <feng.liang1@zte.com.cn>
Diffstat (limited to 'engine-d/src/main/java')
-rw-r--r-- | engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java | 83 | ||||
-rw-r--r-- | engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java | 73 |
2 files changed, 66 insertions, 90 deletions
diff --git a/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java b/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java deleted file mode 100644 index f1bc622..0000000 --- a/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.openo.holmes.enginemgt.listener; - - -import javax.annotation.PostConstruct; -import javax.inject.Inject; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.glassfish.hk2.api.IterableProvider; -import org.jvnet.hk2.annotations.Service; -import org.openo.holmes.common.api.stat.Alarm; -import org.openo.holmes.common.config.MQConfig; -import org.openo.holmes.common.constant.AlarmConst; -import org.openo.holmes.enginemgt.manager.DroolsEngine; - -@Service -@Slf4j -public class AlarmMqMessageListener implements Runnable { - - @Inject - private static IterableProvider<MQConfig> mqConfigProvider; - @Inject - DroolsEngine droolsEngine; - private ConnectionFactory connectionFactory; - - @PostConstruct - public void init() { - - String brokerURL = - "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; - connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, - mqConfigProvider.get().brokerPassword, brokerURL); - } - - - public void run() { - Connection connection; - Session session; - Destination destination; - MessageConsumer messageConsumer; - - try { - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); - messageConsumer = session.createConsumer(destination); - - while (true) { - ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000); - if (objMessage != null) { - droolsEngine.putRaisedIntoStream((Alarm) objMessage.getObject()); - } else { - break; - } - } - } catch (JMSException e) { - log.debug("Receive alarm failure" + e.getMessage()); - } - - } -} diff --git a/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java b/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java index 49b2042..69fe748 100644 --- a/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,16 @@ import java.util.List; import java.util.Locale; import javax.annotation.PostConstruct; import javax.inject.Inject; +import javax.inject.Singleton; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Session; import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.ActiveMQConnectionFactory; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; @@ -34,15 +43,17 @@ import org.drools.io.Resource; import org.drools.io.ResourceFactory; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; +import org.glassfish.hk2.api.IterableProvider; import org.jvnet.hk2.annotations.Service; import org.openo.holmes.common.api.entity.CorrelationRule; import org.openo.holmes.common.api.stat.Alarm; +import org.openo.holmes.common.config.MQConfig; +import org.openo.holmes.common.constant.AlarmConst; import org.openo.holmes.common.exception.DbException; import org.openo.holmes.common.exception.EngineException; import org.openo.holmes.common.exception.RuleIllegalityException; import org.openo.holmes.common.utils.ExceptionUtil; import org.openo.holmes.common.utils.I18nProxy; -import org.openo.holmes.enginemgt.listener.AlarmMqMessageListener; import org.openo.holmes.enginemgt.request.DeployRuleRequest; import org.openo.holmes.enginemgt.wrapper.RuleMgtWrapper; @@ -51,22 +62,34 @@ import org.openo.holmes.enginemgt.wrapper.RuleMgtWrapper; public class DroolsEngine { private final static String CORRELATION_RULE = "CORRELATION_RULE"; + private final static String CORRELATION_ALARM = "CORRELATION_ALARM"; + private final static int ENABLE = 1; + @Inject private RuleMgtWrapper ruleMgtWrapper; - @Inject - private AlarmMqMessageListener mqRegister; + private KnowledgeBase kbase; + private KnowledgeBaseConfiguration kconf; + private StatefulKnowledgeSession ksession; + private KnowledgeBuilder kbuilder; + @Inject + private IterableProvider<MQConfig> mqConfigProvider; + + private ConnectionFactory connectionFactory; + @PostConstruct private void init() { - registerAlarmTopicListener(); try { + // 1. start engine start(); + // 2. start mq listener + registerAlarmTopicListener(); } catch (Exception e) { log.error("Start service failed: " + e.getMessage()); throw ExceptionUtil.buildExceptionResponse("Start service failed!"); @@ -74,7 +97,12 @@ public class DroolsEngine { } private void registerAlarmTopicListener() { - Thread thread = new Thread(mqRegister); + String brokerURL = + "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; + connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, + mqConfigProvider.get().brokerPassword, brokerURL); + + Thread thread = new Thread(new AlarmMqMessageListener()); thread.start(); } @@ -83,7 +111,7 @@ public class DroolsEngine { log.info("Drools Egine Initialize Begining ... "); initEngineParameter(); - initDeployRule(); +// initDeployRule(); log.info("Business Rule Egine Initialize Successfully "); } @@ -212,4 +240,35 @@ public class DroolsEngine { this.ksession.insert(raiseAlarm); this.ksession.fireAllRules(); } + + class AlarmMqMessageListener implements Runnable { + + public void run() { + Connection connection; + Session session; + Destination destination; + MessageConsumer messageConsumer; + + try { + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); + messageConsumer = session.createConsumer(destination); + + while (true) { + ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000); + if (objMessage != null) { + putRaisedIntoStream((Alarm) objMessage.getObject()); + } else { + break; + } + } + } catch (JMSException e) { + log.error("connection mq service Failed: " + e.getMessage()); + } + + } + } + } |