diff options
Diffstat (limited to 'engine-d/src/main')
4 files changed, 215 insertions, 88 deletions
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java new file mode 100644 index 0000000..24e0817 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaappolling; + +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Slf4j +public class DMaaPPollingRequest implements Runnable { + + private Subscriber subscriber; + + private DroolsEngine droolsEngine; + + public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) { + this.subscriber = subscriber; + this.droolsEngine = droolsEngine; + } + + public void run() { + List<VesAlarm> vesAlarmList = new ArrayList<>(); + try { + vesAlarmList = subscriber.subscribe(); + } catch (CorrelationException e) { + log.error("Failed polling request alarm." + e.getMessage()); + } + vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java new file mode 100644 index 0000000..1e71899 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaappolling; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.inject.Inject; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +public class SubscriberAction { + + @Inject + private DroolsEngine droolsEngine; + + private ConcurrentHashMap<String, ScheduledFuture> pollingRequests = new ConcurrentHashMap<String, ScheduledFuture>(); + private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + + public void addSubscriber(Subscriber subscriber) { + DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); + ScheduledFuture future = service + .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); + pollingRequests.put(subscriber.getTopic(), future); + } + + public void removeSubscriber(Subscriber subscriber) { + ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); + if (future != null) { + future.cancel(true); + } + pollingRequests.remove(subscriber.getTopic()); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java index 5d1f442..b23dde0 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java @@ -16,7 +16,6 @@ package org.onap.holmes.engine.manager;
-import java.io.Serializable;
import java.io.StringReader;
import java.util.HashSet;
import java.util.List;
@@ -24,17 +23,7 @@ import java.util.Locale; import java.util.Set;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQObjectMessage;
import org.drools.KnowledgeBase;
import org.drools.KnowledgeBaseConfiguration;
import org.drools.KnowledgeBaseFactory;
@@ -47,13 +36,10 @@ import org.drools.io.Resource; import org.drools.io.ResourceFactory;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
-import org.glassfish.hk2.api.IterableProvider;
import org.jvnet.hk2.annotations.Service;
+import org.onap.holmes.common.api.stat.VesAlarm;
import org.onap.holmes.engine.request.DeployRuleRequest;
import org.onap.holmes.common.api.entity.CorrelationRule;
-import org.onap.holmes.common.api.stat.Alarm;
-import org.onap.holmes.common.config.MQConfig;
-import org.onap.holmes.common.constant.AlarmConst;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.ExceptionUtil;
import org.onap.holmes.engine.wrapper.RuleMgtWrapper;
@@ -69,34 +55,18 @@ public class DroolsEngine { private KnowledgeBase kbase;
private KnowledgeBaseConfiguration kconf;
private StatefulKnowledgeSession ksession;
- @Inject
- private IterableProvider<MQConfig> mqConfigProvider;
- private ConnectionFactory connectionFactory;
@PostConstruct
private void init() {
try {
- // 1. start engine
+ // start engine
start();
- // 2. start mq listener
- registerAlarmTopicListener();
} catch (Exception e) {
log.error("Failed to start the service: " + e.getMessage(), e);
throw ExceptionUtil.buildExceptionResponse("Failed to start the drools engine!");
}
}
- private void registerAlarmTopicListener() {
- String brokerURL =
- "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort;
- connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername,
- mqConfigProvider.get().brokerPassword, brokerURL);
-
- AlarmMqMessageListener listener = new AlarmMqMessageListener();
- listener.receive();
- }
-
-
private void start() throws CorrelationException {
log.info("Drools Engine Initialize Beginning...");
@@ -219,7 +189,7 @@ public class DroolsEngine { }
}
- public void putRaisedIntoStream(Alarm raiseAlarm) {
+ public void putRaisedIntoStream(VesAlarm raiseAlarm) {
FactHandle factHandle = this.ksession.getFactHandle(raiseAlarm);
if (factHandle != null) {
this.ksession.retract(factHandle);
@@ -228,59 +198,4 @@ public class DroolsEngine { this.ksession.fireAllRules();
}
- class AlarmMqMessageListener implements MessageListener {
-
- private Connection connection = null;
- private Session session = null;
- private Destination destination = null;
- private MessageConsumer consumer = null;
-
- private void initialize() throws JMSException {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM);
- consumer = session.createConsumer(destination);
- connection.start();
- }
-
- public void receive() {
- try {
- initialize();
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- log.error("Failed to connect to the MQ service : " + e.getMessage(), e);
- try {
- close();
- } catch (JMSException e1) {
- log.error("Failed close connection " + e1.getMessage(), e1);
- }
- }
- }
-
- public void onMessage(Message arg0) {
- ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0;
- try {
- Serializable object = objectMessage.getObject();
-
- if (object instanceof Alarm) {
- Alarm alarm = (Alarm) object;
- putRaisedIntoStream(alarm);
- }
- } catch (JMSException e) {
- log.error("Failed get object : " + e.getMessage(), e);
- }
- }
-
- private void close() throws JMSException {
- if (consumer != null) {
- consumer.close();
- }
- if (session != null) {
- session.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
}
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java new file mode 100644 index 0000000..bd77312 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/mqconsumer/MQConsumer.java @@ -0,0 +1,114 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.mqconsumer; + +import java.io.Serializable; +import javax.inject.Inject; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.glassfish.hk2.api.IterableProvider; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.config.MQConfig; +import org.onap.holmes.common.constant.AlarmConst; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +@Slf4j +@NoArgsConstructor +public class MQConsumer { + + @Inject + private IterableProvider<MQConfig> mqConfigProvider; + private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory1; + @Inject + private DroolsEngine engine; + + public void registerAlarmTopicListener() { + String brokerURL = + "tcp://" + mqConfigProvider.get().brokerIp + ":" + mqConfigProvider.get().brokerPort; + connectionFactory = new ActiveMQConnectionFactory(mqConfigProvider.get().brokerUsername, + mqConfigProvider.get().brokerPassword, brokerURL); + + AlarmMqMessageListener listener = new AlarmMqMessageListener(); + listener.receive(); + } + class AlarmMqMessageListener implements MessageListener { + + private Connection connection = null; + private Session session = null; + private Destination destination = null; + private MessageConsumer consumer = null; + + private void initialize() throws JMSException { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic(AlarmConst.MQ_TOPIC_NAME_ALARM); + consumer = session.createConsumer(destination); + connection.start(); + } + + public void receive() { + try { + initialize(); + consumer.setMessageListener(this); + } catch (JMSException e) { + log.error("Failed to connect to the MQ service : " + e.getMessage(), e); + try { + close(); + } catch (JMSException e1) { + log.error("Failed close connection " + e1.getMessage(), e1); + } + } + } + + public void onMessage(Message arg0) { + ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) arg0; + try { + Serializable object = objectMessage.getObject(); + if (object instanceof VesAlarm) { + VesAlarm vesAlarm = (VesAlarm) object; + engine.putRaisedIntoStream(vesAlarm); + } + } catch (JMSException e) { + log.error("Failed get object : " + e.getMessage(), e); + } + } + + private void close() throws JMSException { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} |