summaryrefslogtreecommitdiffstats
path: root/engine-d/src/main/java
diff options
context:
space:
mode:
authorFengLiang <feng.liang1@zte.com.cn>2017-02-20 18:05:32 +0800
committerFengLiang <feng.liang1@zte.com.cn>2017-02-21 09:47:33 +0800
commitf7907354daf58f6a729556e21ba01cae777de0be (patch)
tree5aacddc1ef5c00c227a4d09af885605c53eccd3a /engine-d/src/main/java
parenta4f06eee9b2cf0bea405a32bfa39ae2733c292e4 (diff)
Modify engine mq
mq functional testing Change-Id: Ide2891590cd814e0787684edd7384554402532cb Issue-ID:HOLMES-19 Signed-off-by: FengLiang <feng.liang1@zte.com.cn>
Diffstat (limited to 'engine-d/src/main/java')
-rw-r--r--engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java83
-rw-r--r--engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java73
2 files changed, 66 insertions, 90 deletions
diff --git a/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java b/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java
deleted file mode 100644
index f1bc622..0000000
--- a/engine-d/src/main/java/org/openo/holmes/enginemgt/listener/AlarmMqMessageListener.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright 2017 ZTE Corporation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.openo.holmes.enginemgt.listener;
-
-
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.glassfish.hk2.api.IterableProvider;
-import org.jvnet.hk2.annotations.Service;
-import org.openo.holmes.common.api.stat.Alarm;
-import org.openo.holmes.common.config.MQConfig;
-import org.openo.holmes.common.constant.AlarmConst;
-import org.openo.holmes.enginemgt.manager.DroolsEngine;
-
-@Service
-@Slf4j
-public class AlarmMqMessageListener implements Runnable {
-
- @Inject
- private static IterableProvider<MQConfig> mqConfigProvider;
- @Inject
- DroolsEngine droolsEngine;
- private ConnectionFactory connectionFactory;
-
- @PostConstruct
- public void init() {
-
- String brokerURL =
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
- mqConfigProvider.get().brokerPassword, brokerURL);
- }
-
-
- public void run() {
- Connection connection;
- Session session;
- Destination destination;
- MessageConsumer messageConsumer;
-
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- messageConsumer = session.createConsumer(destination);
-
- while (true) {
- ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
- if (objMessage != null) {
- droolsEngine.putRaisedIntoStream((Alarm) objMessage.getObject());
- } else {
- break;
- }
- }
- } catch (JMSException e) {
- log.debug("Receive alarm failure" + e.getMessage());
- }
-
- }
-}
diff --git a/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java b/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java
index 49b2042..69fe748 100644
--- a/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java
+++ b/engine-d/src/main/java/org/openo/holmes/enginemgt/manager/DroolsEngine.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,16 @@ import java.util.List;
import java.util.Locale;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
@@ -34,15 +43,17 @@ import org.drools.io.Resource;
import org.drools.io.ResourceFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
+import org.glassfish.hk2.api.IterableProvider;
import org.jvnet.hk2.annotations.Service;
import org.openo.holmes.common.api.entity.CorrelationRule;
import org.openo.holmes.common.api.stat.Alarm;
+import org.openo.holmes.common.config.MQConfig;
+import org.openo.holmes.common.constant.AlarmConst;
import org.openo.holmes.common.exception.DbException;
import org.openo.holmes.common.exception.EngineException;
import org.openo.holmes.common.exception.RuleIllegalityException;
import org.openo.holmes.common.utils.ExceptionUtil;
import org.openo.holmes.common.utils.I18nProxy;
-import org.openo.holmes.enginemgt.listener.AlarmMqMessageListener;
import org.openo.holmes.enginemgt.request.DeployRuleRequest;
import org.openo.holmes.enginemgt.wrapper.RuleMgtWrapper;
@@ -51,22 +62,34 @@ import org.openo.holmes.enginemgt.wrapper.RuleMgtWrapper;
public class DroolsEngine {
private final static String CORRELATION_RULE = "CORRELATION_RULE";
+
private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
+
private final static int ENABLE = 1;
+
@Inject
private RuleMgtWrapper ruleMgtWrapper;
- @Inject
- private AlarmMqMessageListener mqRegister;
+
private KnowledgeBase kbase;
+
private KnowledgeBaseConfiguration kconf;
+
private StatefulKnowledgeSession ksession;
+
private KnowledgeBuilder kbuilder;
+ @Inject
+ private IterableProvider<MQConfig> mqConfigProvider;
+
+ private ConnectionFactory connectionFactory;
+
@PostConstruct
private void init() {
- registerAlarmTopicListener();
try {
+ // 1. start engine
start();
+ // 2. start mq listener
+ registerAlarmTopicListener();
} catch (Exception e) {
log.error("Start service failed: " + e.getMessage());
throw ExceptionUtil.buildExceptionResponse("Start service failed!");
@@ -74,7 +97,12 @@ public class DroolsEngine {
}
private void registerAlarmTopicListener() {
- Thread thread = new Thread(mqRegister);
+ String brokerURL =
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
+ mqConfigProvider.get().brokerPassword, brokerURL);
+
+ Thread thread = new Thread(new AlarmMqMessageListener());
thread.start();
}
@@ -83,7 +111,7 @@ public class DroolsEngine {
log.info("Drools Egine Initialize Begining ... ");
initEngineParameter();
- initDeployRule();
+// initDeployRule();
log.info("Business Rule Egine Initialize Successfully ");
}
@@ -212,4 +240,35 @@ public class DroolsEngine {
this.ksession.insert(raiseAlarm);
this.ksession.fireAllRules();
}
+
+ class AlarmMqMessageListener implements Runnable {
+
+ public void run() {
+ Connection connection;
+ Session session;
+ Destination destination;
+ MessageConsumer messageConsumer;
+
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ messageConsumer = session.createConsumer(destination);
+
+ while (true) {
+ ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
+ if (objMessage != null) {
+ putRaisedIntoStream((Alarm) objMessage.getObject());
+ } else {
+ break;
+ }
+ }
+ } catch (JMSException e) {
+ log.error("connection mq service Failed: " + e.getMessage());
+ }
+
+ }
+ }
+
}