diff options
author | 6092002067 <wu.youbo@zte.com.cn> | 2017-03-14 17:11:36 +0800 |
---|---|---|
committer | 6092002067 <wu.youbo@zte.com.cn> | 2017-03-14 17:15:37 +0800 |
commit | e16b222eaaf94b75d669863f93ab48c509eae7ad (patch) | |
tree | 989600ed6925a8e64b25631b5d28c330fd50e5ed /engine-d/src/main/java | |
parent | fb73c92232ed387c64b1a175504d605797a6f505 (diff) |
Fix the rule engine not reset the fault
Issue-ID: HOLMES-47
Change-Id: I24a4c78d4fbddb1e5602fde0ac13a881d9e055ac
Signed-off-by: youbowu <wu.youbo@zte.com.cn>
Diffstat (limited to 'engine-d/src/main/java')
-rw-r--r-- | engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java | 619 |
1 files changed, 308 insertions, 311 deletions
diff --git a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java index d9f55ed..ad1b4e5 100644 --- a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java @@ -1,311 +1,308 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.openo.holmes.engine.manager; - - -import java.io.Serializable; -import java.io.StringReader; -import java.util.List; -import java.util.Locale; -import javax.annotation.PostConstruct; -import javax.inject.Inject; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import lombok.extern.slf4j.Slf4j; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.drools.KnowledgeBase; -import org.drools.KnowledgeBaseConfiguration; -import org.drools.KnowledgeBaseFactory; -import org.drools.builder.KnowledgeBuilder; -import org.drools.builder.KnowledgeBuilderFactory; -import org.drools.builder.ResourceType; -import org.drools.conf.EventProcessingOption; -import org.drools.definition.KnowledgePackage; -import org.drools.io.Resource; -import org.drools.io.ResourceFactory; -import org.drools.runtime.StatefulKnowledgeSession; -import org.drools.runtime.rule.FactHandle; -import org.glassfish.hk2.api.IterableProvider; -import org.jvnet.hk2.annotations.Service; -import org.openo.holmes.common.api.entity.CorrelationRule; -import org.openo.holmes.common.api.stat.Alarm; -import org.openo.holmes.common.config.MQConfig; -import org.openo.holmes.common.constant.AlarmConst; -import org.openo.holmes.common.exception.CorrelationException; -import org.openo.holmes.common.utils.ExceptionUtil; -import org.openo.holmes.common.utils.I18nProxy; -import org.openo.holmes.engine.request.DeployRuleRequest; -import org.openo.holmes.engine.wrapper.RuleMgtWrapper; - -@Slf4j -@Service -public class DroolsEngine { - private final static int ENABLE = 1; - - @Inject - private RuleMgtWrapper ruleMgtWrapper; - - private KnowledgeBase kbase; - - private KnowledgeBaseConfiguration kconf; - - private StatefulKnowledgeSession ksession; - - private KnowledgeBuilder kbuilder; - - @Inject - private IterableProvider<MQConfig> mqConfigProvider; - - private ConnectionFactory connectionFactory; - - @PostConstruct - private void init() { - try { - // 1. start engine - start(); - // 2. start mq listener - registerAlarmTopicListener(); - } catch (Exception e) { - log.error("Start service failed: " + e.getMessage(), e); - throw ExceptionUtil.buildExceptionResponse("Start service failed!"); - } - } - - private void registerAlarmTopicListener() { - String brokerURL = - "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; - connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, - mqConfigProvider.get().brokerPassword, brokerURL); - - AlarmMqMessageListener listener = new AlarmMqMessageListener(); - listener.receive(); - } - - - private void start() throws CorrelationException { - log.info("Drools Engine Initialize Beginning..."); - - initEngineParameter(); - initDeployRule(); - - log.info("Business Rule Engine Initialize Successfully."); - } - - public void stop() { - this.ksession.dispose(); - } - - private void initEngineParameter(){ - this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration(); - - this.kconf.setOption(EventProcessingOption.STREAM); - - this.kconf.setProperty("drools.assertBehaviour", "equality"); - - this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - - this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf); - - this.ksession = kbase.newStatefulKnowledgeSession(); - } - - private void initDeployRule() throws CorrelationException { - List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE); - - if (rules.isEmpty()) { - return; - } - for (CorrelationRule rule : rules) { - if (rule.getContent() != null) { - deployRuleFromDB(rule.getContent()); - } - } - } - - private void deployRuleFromDB(String ruleContent) throws CorrelationException { - StringReader reader = new StringReader(ruleContent); - Resource res = ResourceFactory.newReaderResource(reader); - - if (kbuilder == null) { - kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - } - - kbuilder.add(res, ResourceType.DRL); - - try { - - kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); - } catch (Exception e) { - throw new CorrelationException(e.getMessage(), e); - } - - ksession.fireAllRules(); - } - - public synchronized String deployRule(DeployRuleRequest rule, Locale locale) - throws CorrelationException { - StringReader reader = new StringReader(rule.getContent()); - Resource res = ResourceFactory.newReaderResource(reader); - - if (kbuilder == null) { - kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - } - - kbuilder.add(res, ResourceType.DRL); - - if (kbuilder.hasErrors()) { - - String errorMsg = I18nProxy.getInstance().getValueByArgs(locale, - I18nProxy.ENGINE_CONTENT_ILLEGALITY, - new String[]{kbuilder.getErrors().toString()}); - throw new CorrelationException(errorMsg); - } - - KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next(); - - if (kbase.getKnowledgePackages().contains(kpackage)) { - - String errorMsg = I18nProxy.getInstance().getValueByArgs(locale, - I18nProxy.ENGINE_CONTENT_ILLEGALITY,new String[]{ - I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)}); - - throw new CorrelationException(errorMsg); - } - try { - - kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); - } catch (Exception e) { - - String errorMsg = - I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED); - throw new CorrelationException(errorMsg, e); - } - - ksession.fireAllRules(); - return kpackage.getName(); - } - - public synchronized void undeployRule(String packageName, Locale locale) - throws CorrelationException { - - KnowledgePackage pkg = kbase.getKnowledgePackage(packageName); - - if (null == pkg) { - String errorMsg = I18nProxy.getInstance().getValueByArgs(locale, - I18nProxy.ENGINE_DELETE_RULE_NULL, - new String[]{packageName}); - throw new CorrelationException(errorMsg); - } - - try { - - kbase.removeKnowledgePackage(pkg.getName()); - } catch (Exception e) { - String errorMsg = I18nProxy.getInstance().getValueByArgs(locale, - I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName}); - throw new CorrelationException(errorMsg, e); - } - } - - public void compileRule(String content, Locale locale) - throws CorrelationException { - StringReader reader = new StringReader(content); - Resource res = ResourceFactory.newReaderResource(reader); - - if (kbuilder == null) { - kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); - } - - kbuilder.add(res, ResourceType.DRL); - - if (kbuilder.hasErrors()) { - String errorMsg = I18nProxy.getInstance().getValueByArgs(locale, - I18nProxy.ENGINE_CONTENT_ILLEGALITY, - new String[]{kbuilder.getErrors().toString()}); - log.error(errorMsg); - throw new CorrelationException(errorMsg); - } - } - - public void putRaisedIntoStream(Alarm raiseAlarm) { - FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm); - if (factHandle != null) { - this.ksession.retract(factHandle); - } - this.ksession.insert(raiseAlarm); - this.ksession.fireAllRules(); - } - - class AlarmMqMessageListener implements MessageListener { - - private Connection connection = null; - private Session session = null; - private Destination destination = null; - private MessageConsumer consumer = null; - - private void initialize() throws JMSException { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); - consumer = session.createConsumer(destination); - connection.start(); - } - - public void receive() { - try { - initialize(); - consumer.setMessageListener(this); - } catch (JMSException e) { - log.error("Failed to connect to the MQ service : " + e.getMessage(), e); - try { - close(); - } catch (JMSException e1) { - log.error("Failed close connection " + e1.getMessage(), e1); - } - } - } - - public void onMessage(Message arg0) { - ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; - try { - Serializable object = objectMessage.getObject(); - - if (object instanceof Alarm) { - Alarm alarm = (Alarm) object; - putRaisedIntoStream(alarm); - } - } catch (JMSException e) { - log.error("Failed get object : " + e.getMessage(), e); - } - } - - private void close() throws JMSException { - if (consumer != null) - consumer.close(); - if (session != null) - session.close(); - if (connection != null) - connection.close(); - } - } -} +/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.openo.holmes.engine.manager;
+
+
+import java.io.Serializable;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Locale;
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.drools.KnowledgeBase;
+import org.drools.KnowledgeBaseConfiguration;
+import org.drools.KnowledgeBaseFactory;
+import org.drools.builder.KnowledgeBuilder;
+import org.drools.builder.KnowledgeBuilderFactory;
+import org.drools.builder.ResourceType;
+import org.drools.conf.EventProcessingOption;
+import org.drools.definition.KnowledgePackage;
+import org.drools.io.Resource;
+import org.drools.io.ResourceFactory;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+import org.glassfish.hk2.api.IterableProvider;
+import org.jvnet.hk2.annotations.Service;
+import org.openo.holmes.common.api.entity.CorrelationRule;
+import org.openo.holmes.common.api.stat.Alarm;
+import org.openo.holmes.common.config.MQConfig;
+import org.openo.holmes.common.constant.AlarmConst;
+import org.openo.holmes.common.exception.CorrelationException;
+import org.openo.holmes.common.utils.ExceptionUtil;
+import org.openo.holmes.common.utils.I18nProxy;
+import org.openo.holmes.engine.request.DeployRuleRequest;
+import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
+
+@Slf4j
+@Service
+public class DroolsEngine {
+
+ private final static int ENABLE = 1;
+
+ @Inject
+ private RuleMgtWrapper ruleMgtWrapper;
+
+ private KnowledgeBase kbase;
+
+ private KnowledgeBaseConfiguration kconf;
+
+ private StatefulKnowledgeSession ksession;
+
+ private KnowledgeBuilder kbuilder;
+
+ @Inject
+ private IterableProvider<MQConfig> mqConfigProvider;
+
+ private ConnectionFactory connectionFactory;
+
+ @PostConstruct
+ private void init() {
+ try {
+ // 1. start engine
+ start();
+ // 2. start mq listener
+ registerAlarmTopicListener();
+ } catch (Exception e) {
+ log.error("Start service failed: " + e.getMessage(), e);
+ throw ExceptionUtil.buildExceptionResponse("Start service failed!");
+ }
+ }
+
+ private void registerAlarmTopicListener() {
+ String brokerURL =
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
+ mqConfigProvider.get().brokerPassword, brokerURL);
+
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();
+ listener.receive();
+ }
+
+
+ private void start() throws CorrelationException {
+ log.info("Drools Engine Initialize Beginning...");
+
+ initEngineParameter();
+ initDeployRule();
+
+ log.info("Business Rule Engine Initialize Successfully.");
+ }
+
+ public void stop() {
+ this.ksession.dispose();
+ }
+
+ private void initEngineParameter() {
+ this.kconf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
+
+ this.kconf.setOption(EventProcessingOption.STREAM);
+
+ this.kconf.setProperty("drools.assertBehaviour", "equality");
+
+ this.kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+
+ this.kbase = KnowledgeBaseFactory.newKnowledgeBase("D-ENGINE", this.kconf);
+
+ this.ksession = kbase.newStatefulKnowledgeSession();
+ }
+
+ private void initDeployRule() throws CorrelationException {
+ List<CorrelationRule> rules = ruleMgtWrapper.queryRuleByEnable(ENABLE);
+
+ if (rules.isEmpty()) {
+ return;
+ }
+ for (CorrelationRule rule : rules) {
+ if (rule.getContent() != null) {
+ deployRuleFromDB(rule.getContent());
+ }
+ }
+ }
+
+ private void deployRuleFromDB(String ruleContent) throws CorrelationException {
+ StringReader reader = new StringReader(ruleContent);
+ Resource res = ResourceFactory.newReaderResource(reader);
+
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+
+ kbuilder.add(res, ResourceType.DRL);
+
+ try {
+
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+ } catch (Exception e) {
+ throw new CorrelationException(e.getMessage(), e);
+ }
+ ksession.fireAllRules();
+ }
+
+ public synchronized String deployRule(DeployRuleRequest rule, Locale locale)
+ throws CorrelationException {
+ StringReader reader = new StringReader(rule.getContent());
+ Resource res = ResourceFactory.newReaderResource(reader);
+
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+
+ kbuilder.add(res, ResourceType.DRL);
+
+ if (kbuilder.hasErrors()) {
+
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY,
+ new String[]{kbuilder.getErrors().toString()});
+ throw new CorrelationException(errorMsg);
+ }
+
+ KnowledgePackage kpackage = kbuilder.getKnowledgePackages().iterator().next();
+
+ if (kbase.getKnowledgePackages().contains(kpackage)) {
+
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY, new String[]{
+ I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_CONTAINS_PACKAGE)});
+
+ throw new CorrelationException(errorMsg);
+ }
+ try {
+
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+ } catch (Exception e) {
+
+ String errorMsg =
+ I18nProxy.getInstance().getValue(locale, I18nProxy.ENGINE_DEPLOY_RULE_FAILED);
+ throw new CorrelationException(errorMsg, e);
+ }
+
+ ksession.fireAllRules();
+ return kpackage.getName();
+ }
+
+ public synchronized void undeployRule(String packageName, Locale locale)
+ throws CorrelationException {
+
+ KnowledgePackage pkg = kbase.getKnowledgePackage(packageName);
+
+ if (null == pkg) {
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
+ I18nProxy.ENGINE_DELETE_RULE_NULL,
+ new String[]{packageName});
+ throw new CorrelationException(errorMsg);
+ }
+
+ try {
+
+ kbase.removeKnowledgePackage(pkg.getName());
+ } catch (Exception e) {
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
+ I18nProxy.ENGINE_DELETE_RULE_FAILED, new String[]{packageName});
+ throw new CorrelationException(errorMsg, e);
+ }
+ }
+
+ public void compileRule(String content, Locale locale)
+ throws CorrelationException {
+ StringReader reader = new StringReader(content);
+ Resource res = ResourceFactory.newReaderResource(reader);
+
+ kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
+
+ kbuilder.add(res, ResourceType.DRL);
+
+ if (kbuilder.hasErrors()) {
+ String errorMsg = I18nProxy.getInstance().getValueByArgs(locale,
+ I18nProxy.ENGINE_CONTENT_ILLEGALITY,
+ new String[]{kbuilder.getErrors().toString()});
+ log.error(errorMsg);
+ throw new CorrelationException(errorMsg);
+ }
+ }
+
+ public void putRaisedIntoStream(Alarm raiseAlarm) {
+ FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
+ if (factHandle != null) {
+ this.ksession.retract(factHandle);
+ }
+ this.ksession.insert(raiseAlarm);
+ this.ksession.fireAllRules();
+ }
+
+ class AlarmMqMessageListener implements MessageListener {
+
+ private Connection connection = null;
+ private Session session = null;
+ private Destination destination = null;
+ private MessageConsumer consumer = null;
+
+ private void initialize() throws JMSException {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ }
+
+ public void receive() {
+ try {
+ initialize();
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+ try {
+ close();
+ } catch (JMSException e1) {
+ log.error("Failed close connection " + e1.getMessage(), e1);
+ }
+ }
+ }
+
+ public void onMessage(Message arg0) {
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
+ try {
+ Serializable object = objectMessage.getObject();
+
+ if (object instanceof Alarm) {
+ Alarm alarm = (Alarm) object;
+ putRaisedIntoStream(alarm);
+ }
+ } catch (JMSException e) {
+ log.error("Failed get object : " + e.getMessage(), e);
+ }
+ }
+
+ private void close() throws JMSException {
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
|