summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--engine-d/pom.xml4
-rw-r--r--engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java47
-rw-r--r--engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java51
-rw-r--r--engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java91
-rw-r--r--engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java114
-rw-r--r--engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java149
-rw-r--r--engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java189
-rw-r--r--pom.xml7
8 files changed, 422 insertions, 230 deletions
diff --git a/engine-d/pom.xml b/engine-d/pom.xml
index bc8cb20..a5f85dc 100644
--- a/engine-d/pom.xml
+++ b/engine-d/pom.xml
@@ -43,6 +43,10 @@
<artifactId>reflections</artifactId>
</dependency>
<dependency>
+ <groupId>org.onap.holmes.dsa</groupId>
+ <artifactId>dmaap-dsa</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.onap.holmes.common</groupId>
<artifactId>holmes-actions</artifactId>
<exclusions>
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java
new file mode 100644
index 0000000..24e0817
--- /dev/null
+++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.dmaappolling;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.holmes.common.api.stat.VesAlarm;
+import org.onap.holmes.common.exception.CorrelationException;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Slf4j
+public class DMaaPPollingRequest implements Runnable {
+
+ private Subscriber subscriber;
+
+ private DroolsEngine droolsEngine;
+
+ public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) {
+ this.subscriber = subscriber;
+ this.droolsEngine = droolsEngine;
+ }
+
+ public void run() {
+ List<VesAlarm> vesAlarmList = new ArrayList<>();
+ try {
+ vesAlarmList = subscriber.subscribe();
+ } catch (CorrelationException e) {
+ log.error("Failed polling request alarm." + e.getMessage());
+ }
+ vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm));
+ }
+}
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java
new file mode 100644
index 0000000..1e71899
--- /dev/null
+++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.dmaappolling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import javax.inject.Inject;
+import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.dsa.dmaappolling.Subscriber;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Service
+public class SubscriberAction {
+
+ @Inject
+ private DroolsEngine droolsEngine;
+
+ private ConcurrentHashMap<String, ScheduledFuture> pollingRequests = new ConcurrentHashMap<String, ScheduledFuture>();
+ private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ public void addSubscriber(Subscriber subscriber) {
+ DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine);
+ ScheduledFuture future = service
+ .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS);
+ pollingRequests.put(subscriber.getTopic(), future);
+ }
+
+ public void removeSubscriber(Subscriber subscriber) {
+ ScheduledFuture future = pollingRequests.get(subscriber.getTopic());
+ if (future != null) {
+ future.cancel(true);
+ }
+ pollingRequests.remove(subscriber.getTopic());
+ }
+}
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java
index 5d1f442..b23dde0 100644
--- a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java
+++ b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java
@@ -16,7 +16,6 @@
package org.onap.holmes.engine.manager;
-import java.io.Serializable;
import java.io.StringReader;
import java.util.HashSet;
import java.util.List;
@@ -24,17 +23,7 @@ import java.util.Locale;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
@@ -47,13 +36,10 @@ import org.drools.io.Resource;
import org.drools.io.ResourceFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
-import org.glassfish.hk2.api.IterableProvider;
import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.common.api.stat.VesAlarm;
import org.onap.holmes.engine.request.DeployRuleRequest;
import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.api.stat.Alarm;
-import org.onap.holmes.common.config.MQConfig;
-import org.onap.holmes.common.constant.AlarmConst;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.ExceptionUtil;
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
@@ -69,34 +55,18 @@ public class DroolsEngine {
private KnowledgeBase kbase;
private KnowledgeBaseConfiguration kconf;
private StatefulKnowledgeSession ksession;
- @Inject
- private IterableProvider<MQConfig> mqConfigProvider;
- private ConnectionFactory connectionFactory;
@PostConstruct
private void init() {
try {
- // 1. start engine
+ // start engine
start();
- // 2. start mq listener
- registerAlarmTopicListener();
} catch (Exception e) {
log.error("Failed to start the service: " + e.getMessage(), e);
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");
}
}
- private void registerAlarmTopicListener() {
- String brokerURL =
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
- mqConfigProvider.get().brokerPassword, brokerURL);
-
- AlarmMqMessageListener listener = new AlarmMqMessageListener();
- listener.receive();
- }
-
-
private void start() throws CorrelationException {
log.info("Drools Engine Initialize Beginning...");
@@ -219,7 +189,7 @@ public class DroolsEngine {
}
}
- public void putRaisedIntoStream(Alarm raiseAlarm) {
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {
FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
if (factHandle != null) {
this.ksession.retract(factHandle);
@@ -228,59 +198,4 @@ public class DroolsEngine {
this.ksession.fireAllRules();
}
- class AlarmMqMessageListener implements MessageListener {
-
- private Connection connection = null;
- private Session session = null;
- private Destination destination = null;
- private MessageConsumer consumer = null;
-
- private void initialize() throws JMSException {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- consumer = session.createConsumer(destination);
- connection.start();
- }
-
- public void receive() {
- try {
- initialize();
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
- try {
- close();
- } catch (JMSException e1) {
- log.error("Failed close connection " + e1.getMessage(), e1);
- }
- }
- }
-
- public void onMessage(Message arg0) {
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
- try {
- Serializable object = objectMessage.getObject();
-
- if (object instanceof Alarm) {
- Alarm alarm = (Alarm) object;
- putRaisedIntoStream(alarm);
- }
- } catch (JMSException e) {
- log.error("Failed get object : " + e.getMessage(), e);
- }
- }
-
- private void close() throws JMSException {
- if (consumer != null) {
- consumer.close();
- }
- if (session != null) {
- session.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
}
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java
new file mode 100644
index 0000000..bd77312
--- /dev/null
+++ b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.holmes.engine.mqconsumer;
+
+import java.io.Serializable;
+import javax.inject.Inject;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.glassfish.hk2.api.IterableProvider;
+import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.common.api.stat.VesAlarm;
+import org.onap.holmes.common.config.MQConfig;
+import org.onap.holmes.common.constant.AlarmConst;
+import org.onap.holmes.engine.manager.DroolsEngine;
+
+@Service
+@Slf4j
+@NoArgsConstructor
+public class MQConsumer {
+
+ @Inject
+ private IterableProvider<MQConfig> mqConfigProvider;
+ private ConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactory1;
+ @Inject
+ private DroolsEngine engine;
+
+ public void registerAlarmTopicListener() {
+ String brokerURL =
+ "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
+ connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
+ mqConfigProvider.get().brokerPassword, brokerURL);
+
+ AlarmMqMessageListener listener = new AlarmMqMessageListener();
+ listener.receive();
+ }
+ class AlarmMqMessageListener implements MessageListener {
+
+ private Connection connection = null;
+ private Session session = null;
+ private Destination destination = null;
+ private MessageConsumer consumer = null;
+
+ private void initialize() throws JMSException {
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
+ consumer = session.createConsumer(destination);
+ connection.start();
+ }
+
+ public void receive() {
+ try {
+ initialize();
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
+ try {
+ close();
+ } catch (JMSException e1) {
+ log.error("Failed close connection " + e1.getMessage(), e1);
+ }
+ }
+ }
+
+ public void onMessage(Message arg0) {
+ ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
+ try {
+ Serializable object = objectMessage.getObject();
+ if (object instanceof VesAlarm) {
+ VesAlarm vesAlarm = (VesAlarm) object;
+ engine.putRaisedIntoStream(vesAlarm);
+ }
+ } catch (JMSException e) {
+ log.error("Failed get object : " + e.getMessage(), e);
+ }
+ }
+
+ private void close() throws JMSException {
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git a/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java b/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java
index 5487177..885b5e4 100644
--- a/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java
+++ b/engine-d/src/test/java/org/onap/holmes/engine/manager/DroolsEngineTest.java
@@ -16,38 +16,25 @@
package org.onap.holmes.engine.manager;
-import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
import org.drools.conf.EventProcessingOption;
import org.drools.runtime.StatefulKnowledgeSession;
-import org.easymock.EasyMock;
-import org.glassfish.hk2.api.IterableProvider;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.onap.holmes.common.api.stat.VesAlarm;
import org.onap.holmes.engine.request.DeployRuleRequest;
import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.api.stat.Alarm;
-import org.onap.holmes.common.config.MQConfig;
import org.onap.holmes.common.constant.AlarmConst;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
@@ -71,9 +58,6 @@ public class DroolsEngineTest {
private StatefulKnowledgeSession ksession;
- private IterableProvider<MQConfig> mqConfigProvider;
-
- private ConnectionFactory connectionFactory;
private DroolsEngine droolsEngine;
@@ -88,32 +72,24 @@ public class DroolsEngineTest {
this.ksession = kbase.newStatefulKnowledgeSession();
ruleMgtWrapper = PowerMock.createMock(RuleMgtWrapper.class);
- mqConfigProvider = PowerMock.createMock(IterableProvider.class);
- connectionFactory = PowerMock.createMock(ConnectionFactory.class);
Whitebox.setInternalState(droolsEngine, "ruleMgtWrapper", ruleMgtWrapper);
- Whitebox.setInternalState(droolsEngine, "mqConfigProvider", mqConfigProvider);
+
Whitebox.setInternalState(droolsEngine, "kconf", kconf);
Whitebox.setInternalState(droolsEngine, "kbase", kbase);
Whitebox.setInternalState(droolsEngine, "ksession", ksession);
- Whitebox.setInternalState(droolsEngine, "connectionFactory", connectionFactory);
PowerMock.resetAll();
}
@Test
public void init() throws Exception {
- MQConfig mqConfig = new MQConfig();
- mqConfig.brokerIp = "127.0.0.1";
- mqConfig.brokerPort = 4567;
- mqConfig.brokerUsername = "admin";
- mqConfig.brokerPassword = "admin";
+
List<CorrelationRule> rules = new ArrayList<CorrelationRule>();
CorrelationRule rule = new CorrelationRule();
rule.setContent("content");
rules.add(rule);
- expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes();
expect(ruleMgtWrapper.queryRuleByEnable(anyInt())).andReturn(rules);
PowerMock.replayAll();
@@ -189,128 +165,19 @@ public class DroolsEngineTest {
droolsEngine.compileRule(content, locale);
}
-
@Test
public void putRaisedIntoStream_facthandle_is_null() {
- Alarm raiseAlarm = new Alarm();
+ VesAlarm raiseAlarm = new VesAlarm();
+ raiseAlarm.setVersion((long) 245235);
droolsEngine.putRaisedIntoStream(raiseAlarm);
droolsEngine.putRaisedIntoStream(raiseAlarm);
}
@Test
public void putRaisedIntoStream_factHandle_is_not_null() {
- droolsEngine.putRaisedIntoStream(new Alarm());
- }
-
-
- @Test
- public void listener_receive() throws JMSException {
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
-
- Connection connection = PowerMock.createMock(Connection.class);
- Session session = PowerMock.createMock(Session.class);
- Destination destination = PowerMock.createMock(Topic.class);
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
-
- Whitebox.setInternalState(listener, "connection", connection);
- Whitebox.setInternalState(listener, "session", session);
- Whitebox.setInternalState(listener, "destination", destination);
- Whitebox.setInternalState(listener, "consumer", consumer);
-
- PowerMock.reset();
-
- expect(connectionFactory.createConnection()).andReturn(connection);
- connection.start();
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
- consumer.setMessageListener(listener);
-
- PowerMock.replayAll();
-
- listener.receive();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void listener_exception() throws JMSException {
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
-
- Connection connection = PowerMock.createMock(Connection.class);
- Session session = PowerMock.createMock(Session.class);
- Destination destination = PowerMock.createMock(Topic.class);
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
-
- Whitebox.setInternalState(listener, "connection", connection);
- Whitebox.setInternalState(listener, "session", session);
- Whitebox.setInternalState(listener, "destination", destination);
- Whitebox.setInternalState(listener, "consumer", consumer);
-
- PowerMock.reset();
-
- expect(connectionFactory.createConnection()).andReturn(connection);
- connection.start();
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
- consumer.setMessageListener(listener);
- EasyMock.expectLastCall().andThrow(new JMSException(""));
-
- consumer.close();
- session.close();
- connection.close();
-
- PowerMock.replayAll();
-
- listener.receive();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void listener_close_exception() throws JMSException {
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
-
- Connection connection = PowerMock.createMock(Connection.class);
- Session session = PowerMock.createMock(Session.class);
- Destination destination = PowerMock.createMock(Topic.class);
- MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
-
- Whitebox.setInternalState(listener, "connection", connection);
- Whitebox.setInternalState(listener, "session", session);
- Whitebox.setInternalState(listener, "destination", destination);
- Whitebox.setInternalState(listener, "consumer", consumer);
-
- PowerMock.reset();
-
- expect(connectionFactory.createConnection()).andReturn(connection);
- connection.start();
- expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
- expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
- expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
- consumer.setMessageListener(listener);
- EasyMock.expectLastCall().andThrow(new JMSException(""));
-
- consumer.close();
- EasyMock.expectLastCall().andThrow(new JMSException(""));
-
- PowerMock.replayAll();
-
- listener.receive();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void listener_on_message() throws JMSException {
- DroolsEngine.AlarmMqMessageListener listener = droolsEngine.new AlarmMqMessageListener();
- Alarm alarm = new Alarm();
- alarm.setAlarmKey("alarmKey");
- ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();
- objectMessage.setObject(alarm);
-
- listener.onMessage(objectMessage);
+ VesAlarm raiseAlarm = new VesAlarm();
+ raiseAlarm.setVersion((long) 245235);
+ droolsEngine.putRaisedIntoStream(raiseAlarm);
}
@Test
diff --git a/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java
new file mode 100644
index 0000000..b1ea3cb
--- /dev/null
+++ b/engine-d/src/test/java/org/onap/holmes/engine/mqconsumer/MQConsumerTest.java
@@ -0,0 +1,189 @@
+/**
+ * Copyright 2017 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.holmes.engine.mqconsumer;
+
+import static org.easymock.EasyMock.anyBoolean;
+import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.easymock.EasyMock;
+import org.glassfish.hk2.api.IterableProvider;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.holmes.common.api.stat.Alarm;
+import org.onap.holmes.common.config.MQConfig;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.reflect.Whitebox;
+
+public class MQConsumerTest {
+
+ private IterableProvider<MQConfig> mqConfigProvider;
+
+ private ConnectionFactory connectionFactory;
+
+ private MQConsumer mqConsumer;
+
+ private MQConsumer mqConsumer1;
+
+ private MQConsumer mqConsumer2;
+
+ @Before
+ public void setUp() {
+
+ mqConsumer = new MQConsumer();
+
+ mqConfigProvider = PowerMock.createMock(IterableProvider.class);
+ connectionFactory = PowerMock.createMock(ConnectionFactory.class);
+
+ Whitebox.setInternalState(mqConsumer, "mqConfigProvider", mqConfigProvider);
+ Whitebox.setInternalState(mqConsumer, "connectionFactory", connectionFactory);
+ }
+
+ @Test
+ public void init() throws Exception {
+ MQConfig mqConfig = new MQConfig();
+ mqConfig.brokerIp = "127.0.0.1";
+ mqConfig.brokerPort = 4567;
+ mqConfig.brokerUsername = "admin";
+ mqConfig.brokerPassword = "admin";
+
+ expect(mqConfigProvider.get()).andReturn(mqConfig).anyTimes();
+ PowerMock.replayAll();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_receive() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_exception() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ consumer.close();
+ session.close();
+ connection.close();
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_close_exception() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+
+ Connection connection = PowerMock.createMock(Connection.class);
+ Session session = PowerMock.createMock(Session.class);
+ Destination destination = PowerMock.createMock(Topic.class);
+ MessageConsumer consumer = PowerMock.createMock(MessageConsumer.class);
+
+ Whitebox.setInternalState(listener, "connection", connection);
+ Whitebox.setInternalState(listener, "session", session);
+ Whitebox.setInternalState(listener, "destination", destination);
+ Whitebox.setInternalState(listener, "consumer", consumer);
+
+ PowerMock.reset();
+
+ expect(connectionFactory.createConnection()).andReturn(connection);
+ connection.start();
+ expect(connection.createSession(anyBoolean(), anyInt())).andReturn(session);
+ expect(session.createTopic(anyObject(String.class))).andReturn((Topic) destination);
+ expect(session.createConsumer(anyObject(Destination.class))).andReturn(consumer);
+ consumer.setMessageListener(listener);
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ consumer.close();
+ EasyMock.expectLastCall().andThrow(new JMSException(""));
+
+ PowerMock.replayAll();
+
+ listener.receive();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void listener_on_message() throws JMSException {
+ MQConsumer.AlarmMqMessageListener listener = mqConsumer.new AlarmMqMessageListener();
+ Alarm alarm = new Alarm();
+ alarm.setAlarmKey("alarmKey");
+ ActiveMQObjectMessage objectMessage = new ActiveMQObjectMessage();
+ objectMessage.setObject(alarm);
+
+ listener.onMessage(objectMessage);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 8dcb2f1..3b517e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
<artifactId>holmes-engine-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
- <name>holmes-engine-parent</name>
+ <name>holmes-engine-management</name>
<modules>
<module>engine-d</module>
<module>engine-d-standalone</module>
@@ -93,6 +93,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.onap.holmes.dsa</groupId>
+ <artifactId>dmaap-dsa</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
<version>${dropwizard.version}</version>