diff options
author | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-10-26 17:13:07 +0800 |
---|---|---|
committer | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-10-26 19:50:33 +0800 |
commit | b6e1b12ee06a03b4faff592df9644575d9fb58c3 (patch) | |
tree | 33a43fdee303478c5d18a5f20f9b051963d8eb67 /engine-d/src/main/java/org | |
parent | bc90ee84d4511acab2b9be5c9a08776d54f8ece5 (diff) |
modify dmaap request way
Issue-ID: HOLMES-71
Change-Id: Ie6bc1234fd0108e651635e8f8bfb45b0dcb4cd13
Signed-off-by: Shiwei Tian <tian.shiwei@zte.com.cn>
Diffstat (limited to 'engine-d/src/main/java/org')
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java | 4 | ||||
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java (renamed from engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java) | 4 | ||||
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java (renamed from engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java) | 26 | ||||
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java | 61 | ||||
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java | 53 | ||||
-rw-r--r-- | engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java | 3 |
6 files changed, 81 insertions, 70 deletions
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java index 69e4c7a..73c35bc 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java @@ -15,8 +15,6 @@ */ package org.onap.holmes.engine; -import static jdk.nashorn.internal.runtime.regexp.joni.Config.log; - import io.dropwizard.setup.Environment; import java.util.HashSet; import java.util.Set; @@ -28,7 +26,7 @@ import org.onap.holmes.common.config.MicroServiceConfig; import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.MSBRegisterUtil; -import org.onap.holmes.engine.dcaepolling.DcaeConfigurationPolling; +import org.onap.holmes.engine.dcae.DcaeConfigurationPolling; import org.onap.holmes.engine.resources.EngineResources; import org.onap.msb.sdk.discovery.entity.MicroServiceInfo; import org.onap.msb.sdk.discovery.entity.Node; diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java index 9fa153e..4808009 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.onap.holmes.engine.dcaepolling; +package org.onap.holmes.engine.dcae; import lombok.extern.slf4j.Slf4j; import org.onap.holmes.common.dcae.DcaeConfigurationQuery; @@ -22,7 +22,7 @@ import org.onap.holmes.common.dcae.entity.DcaeConfigurations; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; +import org.onap.holmes.engine.dmaap.SubscriberAction; @Slf4j public class DcaeConfigurationPolling implements Runnable{ diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java index 24e0817..52f3915 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.onap.holmes.engine.dmaappolling; +package org.onap.holmes.engine.dmaap; import java.util.ArrayList; import java.util.List; @@ -24,24 +24,30 @@ import org.onap.holmes.dsa.dmaappolling.Subscriber; import org.onap.holmes.engine.manager.DroolsEngine; @Slf4j -public class DMaaPPollingRequest implements Runnable { +public class DMaaPAlarmPolling implements Runnable { private Subscriber subscriber; - private DroolsEngine droolsEngine; + private volatile boolean isAlive = true; - public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) { + public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine) { this.subscriber = subscriber; this.droolsEngine = droolsEngine; } public void run() { - List<VesAlarm> vesAlarmList = new ArrayList<>(); - try { - vesAlarmList = subscriber.subscribe(); - } catch (CorrelationException e) { - log.error("Failed polling request alarm." + e.getMessage()); + while (isAlive) { + List<VesAlarm> vesAlarmList = new ArrayList<>(); + try { + vesAlarmList = subscriber.subscribe(); + } catch (CorrelationException e) { + log.error("Failed polling request alarm." + e.getMessage()); + } + vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); } - vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + } + + public void stopTask() { + isAlive = false; } } diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java new file mode 100644 index 0000000..8d80a6b --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaap; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +public class SubscriberAction { + + @Inject + private DroolsEngine droolsEngine; + private HashMap<String, DMaaPAlarmPolling> pollingTasks = new HashMap<>(); + + public synchronized void addSubscriber(Subscriber subscriber) { + if (!pollingTasks.containsKey(subscriber.getTopic())) { + DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine); + Thread thread = new Thread(pollingTask); + thread.start(); + pollingTasks.put(subscriber.getTopic(), pollingTask); + } + } + + public synchronized void removeSubscriber(Subscriber subscriber) { + if (pollingTasks.containsKey(subscriber.getTopic())) { + pollingTasks.get(subscriber.getTopic()).stopTask(); + pollingTasks.remove(subscriber.getTopic()); + } + } + + @PreDestroy + public void stopPollingTasks() { + Iterator iterator = pollingTasks.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = (Map.Entry)iterator.next(); + String key = (String) entry.getKey(); + pollingTasks.get(key).stopTask(); + } + pollingTasks.clear(); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java deleted file mode 100644 index da83683..0000000 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.dmaappolling; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import javax.inject.Inject; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.manager.DroolsEngine; - -@Service -public class SubscriberAction { - - @Inject - private DroolsEngine droolsEngine; - - private ConcurrentHashMap<String, ScheduledFuture> pollingRequests = new ConcurrentHashMap<String, ScheduledFuture>(); - private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - - public void addSubscriber(Subscriber subscriber) { - if (!pollingRequests.containsKey(subscriber.getTopic())) { - DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); - ScheduledFuture future = service - .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); - pollingRequests.put(subscriber.getTopic(), future); - } - } - - public void removeSubscriber(Subscriber subscriber) { - ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); - if (future != null) { - future.cancel(true); - } - pollingRequests.remove(subscriber.getTopic()); - } -} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java index 9fa1874..99567b8 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java @@ -16,7 +16,6 @@ package org.onap.holmes.engine.resources; -import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import javax.servlet.http.HttpServletRequest; @@ -33,7 +32,7 @@ import org.onap.holmes.common.dcae.DcaeConfigurationsCache; import org.onap.holmes.common.dcae.entity.SecurityInfo; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; +import org.onap.holmes.engine.dmaap.SubscriberAction; import org.onap.holmes.engine.request.DmaapConfigRequest; @Service |