summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--engine-d-standalone/src/main/assembly/bin/initDB.sh1
-rw-r--r--engine-d-standalone/src/main/assembly/conf/engine-d.yml9
-rw-r--r--engine-d/src/main/java/org/openo/holmes/engine/db/CorrelationRuleDao.java2
-rw-r--r--engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java78
-rw-r--r--engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java102
5 files changed, 141 insertions, 51 deletions
diff --git a/engine-d-standalone/src/main/assembly/bin/initDB.sh b/engine-d-standalone/src/main/assembly/bin/initDB.sh
index 7f16c2a..a02391b 100644
--- a/engine-d-standalone/src/main/assembly/bin/initDB.sh
+++ b/engine-d-standalone/src/main/assembly/bin/initDB.sh
@@ -23,6 +23,7 @@ port=$3
host=$4
echo "start init holmes engine-d db"
main_path=$HOME/../
+cat $main_path
mysql -u$user -p$password -P$port -h$host <$main_path/dbscripts/mysql/openo-holmes_engine_d-createobj.sql
sql_result=$?
if [ $sql_result != 0 ] ; then
diff --git a/engine-d-standalone/src/main/assembly/conf/engine-d.yml b/engine-d-standalone/src/main/assembly/conf/engine-d.yml
index 3a11fcb..66597d9 100644
--- a/engine-d-standalone/src/main/assembly/conf/engine-d.yml
+++ b/engine-d-standalone/src/main/assembly/conf/engine-d.yml
@@ -20,7 +20,7 @@ server:
adminContextPath: /admin
connector:
type: http
- port: 12003
+ port: 9102
# Logging settings.
@@ -50,8 +50,8 @@ logging:
database:
driverClass: com.mysql.jdbc.Driver
- user: root
- password: rootpass
+ user: holmes
+ password: holmes
url: jdbc:mysql://10.74.156.206:3306/holmes
properties:
charSet: UTF-8
@@ -69,3 +69,6 @@ mqConfig:
brokerUsername: activemq
brokerPassword: v1
+msbServerAddr: http://127.0.0.1:80
+
+
diff --git a/engine-d/src/main/java/org/openo/holmes/engine/db/CorrelationRuleDao.java b/engine-d/src/main/java/org/openo/holmes/engine/db/CorrelationRuleDao.java
index 0c00102..4370054 100644
--- a/engine-d/src/main/java/org/openo/holmes/engine/db/CorrelationRuleDao.java
+++ b/engine-d/src/main/java/org/openo/holmes/engine/db/CorrelationRuleDao.java
@@ -27,7 +27,7 @@ import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
public abstract class CorrelationRuleDao {
- @SqlQuery("SELECT * FROM aplus_rule WHERE enable=:enable")
+ @SqlQuery("SELECT * FROM APLUS_RULE WHERE enable=:enable")
public abstract List<CorrelationRule> queryRuleByEnable(@Bind("enable") int enable);
public List<CorrelationRule> queryRuleByRuleEnable(int enable) {
diff --git a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java
index 0ed452a..d9f55ed 100644
--- a/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java
+++ b/engine-d/src/main/java/org/openo/holmes/engine/manager/DroolsEngine.java
@@ -16,6 +16,7 @@
package org.openo.holmes.engine.manager;
+import java.io.Serializable;
import java.io.StringReader;
import java.util.List;
import java.util.Locale;
@@ -25,11 +26,13 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
@@ -57,11 +60,6 @@ import org.openo.holmes.engine.wrapper.RuleMgtWrapper;
@Slf4j
@Service
public class DroolsEngine {
-
- private final static String CORRELATION_RULE = "CORRELATION_RULE";
-
- private final static String CORRELATION_ALARM = "CORRELATION_ALARM";
-
private final static int ENABLE = 1;
@Inject
@@ -99,8 +97,8 @@ public class DroolsEngine {
connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
mqConfigProvider.get().brokerPassword, brokerURL);
- Thread thread = new Thread(new AlarmMqMessageListener());
- thread.start();
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();
+ listener.receive();
}
@@ -258,34 +256,56 @@ public class DroolsEngine {
this.ksession.fireAllRules();
}
- class AlarmMqMessageListener implements Runnable {
+ class AlarmMqMessageListener implements MessageListener {
+
+ private Connection connection = null;
+ private Session session = null;
+ private Destination destination = null;
+ private MessageConsumer consumer = null;
- public void run() {
- Connection connection;
- Session session;
- Destination destination;
- MessageConsumer messageConsumer;
+ private void initialize() throws JMSException {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ }
+
+ public void receive() {
+ try {
+ initialize();
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+ try {
+ close();
+ } catch (JMSException e1) {
+ log.error("Failed close connection " + e1.getMessage(), e1);
+ }
+ }
+ }
+ public void onMessage(Message arg0) {
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
try {
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- messageConsumer = session.createConsumer(destination);
-
- while (true) {
- ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
- if (objMessage != null) {
- putRaisedIntoStream((Alarm) objMessage.getObject());
- } else {
- break;
- }
+ Serializable object = objectMessage.getObject();
+
+ if (object instanceof Alarm) {
+ Alarm alarm = (Alarm) object;
+ putRaisedIntoStream(alarm);
}
} catch (JMSException e) {
- log.error("connection mq service Failed: " + e.getMessage(), e);
+ log.error("Failed get object : " + e.getMessage(), e);
}
+ }
+ private void close() throws JMSException {
+ if (consumer != null)
+ consumer.close();
+ if (session != null)
+ session.close();
+ if (connection != null)
+ connection.close();
}
}
-
}
diff --git a/engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java b/engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java
index 1740d62..0f637c7 100644
--- a/engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java
+++ b/engine-d/src/test/java/org/openo/holmes/engine/manager/DroolsEngineTest.java
@@ -36,6 +36,7 @@ import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.builder.KnowledgeBuilder;
@@ -362,59 +363,124 @@ public class DroolsEngineTest {
PowerMock.verifyAll();
}
+
@Test
- public void listener_run_objmessage_is_null() throws JMSException {
+ public void listener_receive() throws JMSException {
DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
Connection connection = PowerMock.createMock(Connection.class);
Session session = PowerMock.createMock(Session.class);
Destination destination = PowerMock.createMock(Topic.class);
- MessageConsumer messageConsumer = PowerMock.createMock(MessageConsumer.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
expect(connectionFactory.createConnection()).andReturn(connection);
connection.start();
expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(messageConsumer);
- expect(messageConsumer.receive(anyLong())).andReturn(null);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
PowerMock.replayAll();
- listener.run();
+ listener.receive();
PowerMock.verifyAll();
}
@Test
- public void listener_run_objmessage_is_not_null() throws JMSException {
+ public void listener_exception() throws JMSException {
DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
Connection connection = PowerMock.createMock(Connection.class);
Session session = PowerMock.createMock(Session.class);
Destination destination = PowerMock.createMock(Topic.class);
- MessageConsumer messageConsumer = PowerMock.createMock(MessageConsumer.class);
- ObjectMessage objMessage = PowerMock.createMock(ObjectMessage.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
- FactHandle factHandle = PowerMock.createMock(FactHandle.class);
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
expect(connectionFactory.createConnection()).andReturn(connection);
connection.start();
expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(messageConsumer);
- expect(messageConsumer.receive(anyLong())).andReturn(objMessage);
- expect(objMessage.getObject()).andReturn(new Alarm());
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
- expect(ksession.getFactHandle(anyObject(Alarm.class))).andReturn(factHandle);
- ksession.retract(anyObject(FactHandle.class));
- expect(ksession.insert(anyObject(Alarm.class))).andReturn(null);
- expect(ksession.fireAllRules()).andReturn(0);
+ consumer.close();
+ session.close();
+ connection.close();
+
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_close_exception() throws JMSException {
+ DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
- expect(messageConsumer.receive(anyLong())).andReturn(null);
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ consumer.close();
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_on_message() throws JMSException {
+ DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
+ Alarm alarm = new Alarm();
+ alarm.setAlarmKey("alarmKey");
+ ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();
+ objectMessage.setObject(alarm);
+
+ expect(ksession.getFactHandle(anyObject(Alarm.class))).andReturn(null);
+
+ expect(ksession.insert(anyObject(Alarm.class))).andReturn(null);
+ expect(ksession.fireAllRules()).andReturn(1);
PowerMock.replayAll();
- listener.run();
+ listener.onMessage(objectMessage);
PowerMock.verifyAll();
}