From 43f5b3a0ddb7f22b3c29a4f042991155a46e41d6 Mon Sep 17 00:00:00 2001 From: Fei Tang Date: Wed, 28 Mar 2018 08:25:04 +0800 Subject: add alarm synchronization related operation Issue-ID: HOLMES-106 Change-Id: Idb47aa0726c0c8fc179c085381092e01d7a26fb3 Signed-off-by: Fei Tang --- .../org/onap/holmes/engine/db/AlarmInfoDao.java | 62 ++++++++++++++++++++++ .../holmes/engine/dmaap/DMaaPAlarmPolling.java | 35 ++++++++++-- .../onap/holmes/engine/dmaap/SubscriberAction.java | 7 ++- .../onap/holmes/engine/manager/DroolsEngine.java | 41 ++++++++++++-- 4 files changed, 136 insertions(+), 9 deletions(-) create mode 100644 engine-d/src/main/java/org/onap/holmes/engine/db/AlarmInfoDao.java (limited to 'engine-d/src/main/java/org/onap') diff --git a/engine-d/src/main/java/org/onap/holmes/engine/db/AlarmInfoDao.java b/engine-d/src/main/java/org/onap/holmes/engine/db/AlarmInfoDao.java new file mode 100644 index 0000000..c7094c3 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/db/AlarmInfoDao.java @@ -0,0 +1,62 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.db; + +import org.onap.holmes.common.api.entity.AlarmInfo; +import org.onap.holmes.common.exception.AlarmInfoException; +import org.onap.holmes.common.utils.AlarmInfoMapper; +import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper; + +import java.util.List; + +@RegisterMapper(AlarmInfoMapper.class) +public abstract class AlarmInfoDao { + + @GetGeneratedKeys + @SqlUpdate("INSERT INTO ALARM_INFO (EVENTID,EVENTNAME,STARTEPOCHMICROSEC,SOURCEID,SOURCENAME,ALARMISCLEARED,ROOTFLAG,LASTEPOCHMICROSEC) VALUES (:eventId,:eventName,:startEpochMicroSec,:sourceId,:sourceName,:alarmIsCleared,:rootFlag,:lastEpochMicroSec)") + protected abstract String addAlarm(@BindBean AlarmInfo alarmInfo); + + @SqlQuery("SELECT * FROM ALARM_INFO") + protected abstract List queryAlarm(); + + @SqlUpdate("DELETE FROM ALARM_INFO WHERE ALARMISCLEARED=:alarmiscleared") + protected abstract int deleteAlarmByAlarmIsCleared(@Bind("alarmiscleared") int alarmIsCleared); + + public AlarmInfo saveAlarm(AlarmInfo alarmInfo) throws AlarmInfoException { + try { + addAlarm(alarmInfo); + return alarmInfo; + } catch(Exception e) { + throw new AlarmInfoException("Can not access the database. Please contact the administrator for help.", e); + } + } + + public List queryAllAlarm() throws AlarmInfoException { + try { + return queryAlarm(); + } catch(Exception e) { + throw new AlarmInfoException("Can not access the database. Please contact the administrator for help.", e); + } + } + + public void deleteClearedAlarm(AlarmInfo alarmInfo) { + int alarmIsCleared = alarmInfo.getAlarmIsCleared(); + if(alarmIsCleared == 1) { + deleteAlarmByAlarmIsCleared(alarmIsCleared); + } + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java index 935d2c8..1446b14 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java @@ -14,24 +14,32 @@ * limitations under the License. */ package org.onap.holmes.engine.dmaap; -import java.util.ArrayList; -import java.util.List; + import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.api.entity.AlarmInfo; import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.exception.AlarmInfoException; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.db.AlarmInfoDao; import org.onap.holmes.engine.manager.DroolsEngine; +import java.util.ArrayList; +import java.util.List; + @Slf4j public class DMaaPAlarmPolling implements Runnable { private Subscriber subscriber; private DroolsEngine droolsEngine; private volatile boolean isAlive = true; + private AlarmInfoDao alarmInfoDao; - public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine) { + + public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine, AlarmInfoDao alarmInfoDao) { this.subscriber = subscriber; this.droolsEngine = droolsEngine; + this.alarmInfoDao = alarmInfoDao; } public void run() { @@ -39,7 +47,14 @@ public class DMaaPAlarmPolling implements Runnable { List vesAlarmList = new ArrayList<>(); try { vesAlarmList = subscriber.subscribe(); - vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + vesAlarmList.forEach(vesAlarm -> { + try { + alarmInfoDao.saveAlarm(getAlarmInfo(vesAlarm)); + droolsEngine.putRaisedIntoStream(vesAlarm); + } catch(AlarmInfoException e) { + log.error("Failed to save alarm to database", e); + } + }); } catch (CorrelationException e) { log.error("Failed to process alarms. Sleep for 60 seconds to restart.", e); try { @@ -59,6 +74,18 @@ public class DMaaPAlarmPolling implements Runnable { } } } + private AlarmInfo getAlarmInfo(VesAlarm vesAlarm) { + AlarmInfo alarmInfo = new AlarmInfo(); + alarmInfo.setAlarmIsCleared(vesAlarm.getAlarmIsCleared()); + alarmInfo.setSourceName(vesAlarm.getSourceName()); + alarmInfo.setSourceId(vesAlarm.getSourceId()); + alarmInfo.setStartEpochMicroSec(vesAlarm.getStartEpochMicrosec()); + alarmInfo.setLastEpochMicroSec(vesAlarm.getLastEpochMicrosec()); + alarmInfo.setEventId(vesAlarm.getEventId()); + alarmInfo.setEventName(vesAlarm.getEventName()); + alarmInfo.setRootFlag(vesAlarm.getRootFlag()); + return alarmInfo; + } public void stopTask() { isAlive = false; diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java index c2fa5b8..b02cbe4 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java @@ -23,7 +23,9 @@ import javax.annotation.PreDestroy; import javax.inject.Inject; import lombok.extern.slf4j.Slf4j; import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.utils.DbDaoUtil; import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.db.AlarmInfoDao; import org.onap.holmes.engine.manager.DroolsEngine; @Service @@ -32,12 +34,15 @@ public class SubscriberAction { @Inject private DroolsEngine droolsEngine; + @Inject + private DbDaoUtil daoUtil; private HashMap pollingTasks = new HashMap<>(); public synchronized void addSubscriber(Subscriber subscriber) { String topic = subscriber.getTopic(); if (topic != null && !pollingTasks.containsKey(topic)) { - DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine); + AlarmInfoDao alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class); + DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine, alarmInfoDao); Thread thread = new Thread(pollingTask); thread.start(); pollingTasks.put(topic, pollingTask); diff --git a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java index d790ee7..3740f18 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/manager/DroolsEngine.java @@ -21,11 +21,11 @@ import java.util.List; import java.util.Locale; import java.util.Set; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.inject.Inject; import lombok.extern.slf4j.Slf4j; import org.drools.compiler.kie.builder.impl.InternalKieModule; import org.jvnet.hk2.annotations.Service; + import org.kie.api.KieBase; import org.kie.api.KieServices; import org.kie.api.builder.KieBuilder; @@ -45,8 +45,14 @@ import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.conf.ClockTypeOption; import org.kie.api.runtime.rule.FactHandle; + +import org.onap.holmes.common.api.entity.AlarmInfo; + import org.onap.holmes.common.api.stat.VesAlarm; import org.onap.holmes.common.dmaap.DmaapService; +import org.onap.holmes.common.exception.AlarmInfoException; +import org.onap.holmes.common.utils.DbDaoUtil; +import org.onap.holmes.engine.db.AlarmInfoDao; import org.onap.holmes.engine.request.DeployRuleRequest; import org.onap.holmes.common.api.entity.CorrelationRule; import org.onap.holmes.common.exception.CorrelationException; @@ -62,6 +68,7 @@ public class DroolsEngine { @Inject private RuleMgtWrapper ruleMgtWrapper; + private KieBase kieBase; private KieSession kieSession; private KieContainer kieContainer; @@ -71,8 +78,14 @@ public class DroolsEngine { private KieResources resources; private KieRepository kieRepository; + private AlarmInfoDao alarmInfoDao; + @Inject + private DbDaoUtil daoUtil; + + @PostConstruct private void init() { + alarmInfoDao = daoUtil.getJdbiDaoByOnDemand(AlarmInfoDao.class); try { // start engine start(); @@ -82,13 +95,14 @@ public class DroolsEngine { } } - private void start() throws CorrelationException { + private void start() throws AlarmInfoException { log.info("Drools Engine Initialize Beginning..."); initEngineParameter(); - initDeployRule(); + alarmSynchronization(); +// initDeployRule(); - log.info("Business Rule Engine Initialize Successfully."); + log.info("Alarm synchronization Successfully."); } public void stop() { @@ -220,6 +234,7 @@ public class DroolsEngine { } this.kieSession.insert(raiseAlarm); this.kieSession.fireAllRules(); + } public List queryAllPackage() { @@ -261,4 +276,22 @@ public class DroolsEngine { kieBase.removeKiePackage(kiePackage.getName()); } + public void alarmSynchronization() throws AlarmInfoException { + alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> alarmInfoDao.deleteClearedAlarm(alarmInfo)); + alarmInfoDao.queryAllAlarm().forEach(alarmInfo -> putRaisedIntoStream(convertAlarmInfo2VesAlarm(alarmInfo))); + } + + private VesAlarm convertAlarmInfo2VesAlarm(AlarmInfo alarmInfo) { + VesAlarm vesAlarm = new VesAlarm(); + vesAlarm.setEventId(alarmInfo.getEventId()); + vesAlarm.setEventName(alarmInfo.getEventName()); + vesAlarm.setStartEpochMicrosec(alarmInfo.getStartEpochMicroSec()); + vesAlarm.setSourceId(alarmInfo.getSourceId()); + vesAlarm.setSourceName(alarmInfo.getSourceName()); + vesAlarm.setRootFlag(alarmInfo.getRootFlag()); + vesAlarm.setAlarmIsCleared(alarmInfo.getAlarmIsCleared()); + vesAlarm.setLastEpochMicrosec(alarmInfo.getLastEpochMicroSec()); + return vesAlarm; + } + } -- cgit 1.2.3-korg