From b6e1b12ee06a03b4faff592df9644575d9fb58c3 Mon Sep 17 00:00:00 2001 From: Shiwei Tian Date: Thu, 26 Oct 2017 17:13:07 +0800 Subject: modify dmaap request way Issue-ID: HOLMES-71 Change-Id: Ie6bc1234fd0108e651635e8f8bfb45b0dcb4cd13 Signed-off-by: Shiwei Tian --- .../org/onap/holmes/engine/EngineDActiveApp.java | 4 +- .../engine/dcae/DcaeConfigurationPolling.java | 64 ++++++++++++++++++++++ .../dcaepolling/DcaeConfigurationPolling.java | 64 ---------------------- .../holmes/engine/dmaap/DMaaPAlarmPolling.java | 53 ++++++++++++++++++ .../onap/holmes/engine/dmaap/SubscriberAction.java | 61 +++++++++++++++++++++ .../engine/dmaappolling/DMaaPPollingRequest.java | 47 ---------------- .../engine/dmaappolling/SubscriberAction.java | 53 ------------------ .../resources/DmaapConfigurationService.java | 3 +- 8 files changed, 180 insertions(+), 169 deletions(-) create mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java delete mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java create mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java create mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java delete mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java delete mode 100644 engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java (limited to 'engine-d/src/main/java/org/onap') diff --git a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java index 69e4c7a..73c35bc 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java @@ -15,8 +15,6 @@ */ package org.onap.holmes.engine; -import static jdk.nashorn.internal.runtime.regexp.joni.Config.log; - import io.dropwizard.setup.Environment; import java.util.HashSet; import java.util.Set; @@ -28,7 +26,7 @@ import org.onap.holmes.common.config.MicroServiceConfig; import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.MSBRegisterUtil; -import org.onap.holmes.engine.dcaepolling.DcaeConfigurationPolling; +import org.onap.holmes.engine.dcae.DcaeConfigurationPolling; import org.onap.holmes.engine.resources.EngineResources; import org.onap.msb.sdk.discovery.entity.MicroServiceInfo; import org.onap.msb.sdk.discovery.entity.Node; diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java new file mode 100644 index 0000000..4808009 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dcae/DcaeConfigurationPolling.java @@ -0,0 +1,64 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dcae; + +import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.dcae.DcaeConfigurationQuery; +import org.onap.holmes.common.dcae.DcaeConfigurationsCache; +import org.onap.holmes.common.dcae.entity.DcaeConfigurations; +import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.dmaap.SubscriberAction; + +@Slf4j +public class DcaeConfigurationPolling implements Runnable{ + + private String hostname; + + private String subscriberKey = "sec_fault_unsecure"; + + public static long POLLING_PERIOD = 10 * 1000L; + + public DcaeConfigurationPolling(String hostname) { + this.hostname = hostname; + } + + @Override + public void run() { + DcaeConfigurations dcaeConfigurations = null; + try { + dcaeConfigurations = DcaeConfigurationQuery + .getDcaeConfigurations(hostname); + } catch (CorrelationException e) { + log.error("Failed to polling dcae configurations" + e.getMessage()); + } + if (dcaeConfigurations != null) { + DcaeConfigurationsCache.setDcaeConfigurations(dcaeConfigurations); + addSubscriber(dcaeConfigurations); + } + } + + private void addSubscriber(DcaeConfigurations dcaeConfigurations) { + SubscriberAction subscriberAction = ServiceLocatorHolder.getLocator() + .getService(SubscriberAction.class); + Subscriber subscriber = new Subscriber(); + subscriber.setTopic(subscriberKey); + subscriber.setUrl(dcaeConfigurations.getSubSecInfo(subscriberKey).getDmaapInfo() + .getTopicUrl()); + subscriberAction.addSubscriber(subscriber); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java deleted file mode 100644 index 9fa153e..0000000 --- a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.dcaepolling; - -import lombok.extern.slf4j.Slf4j; -import org.onap.holmes.common.dcae.DcaeConfigurationQuery; -import org.onap.holmes.common.dcae.DcaeConfigurationsCache; -import org.onap.holmes.common.dcae.entity.DcaeConfigurations; -import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; - -@Slf4j -public class DcaeConfigurationPolling implements Runnable{ - - private String hostname; - - private String subscriberKey = "sec_fault_unsecure"; - - public static long POLLING_PERIOD = 10 * 1000L; - - public DcaeConfigurationPolling(String hostname) { - this.hostname = hostname; - } - - @Override - public void run() { - DcaeConfigurations dcaeConfigurations = null; - try { - dcaeConfigurations = DcaeConfigurationQuery - .getDcaeConfigurations(hostname); - } catch (CorrelationException e) { - log.error("Failed to polling dcae configurations" + e.getMessage()); - } - if (dcaeConfigurations != null) { - DcaeConfigurationsCache.setDcaeConfigurations(dcaeConfigurations); - addSubscriber(dcaeConfigurations); - } - } - - private void addSubscriber(DcaeConfigurations dcaeConfigurations) { - SubscriberAction subscriberAction = ServiceLocatorHolder.getLocator() - .getService(SubscriberAction.class); - Subscriber subscriber = new Subscriber(); - subscriber.setTopic(subscriberKey); - subscriber.setUrl(dcaeConfigurations.getSubSecInfo(subscriberKey).getDmaapInfo() - .getTopicUrl()); - subscriberAction.addSubscriber(subscriber); - } -} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java new file mode 100644 index 0000000..52f3915 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/DMaaPAlarmPolling.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaap; + +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.api.stat.VesAlarm; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Slf4j +public class DMaaPAlarmPolling implements Runnable { + + private Subscriber subscriber; + private DroolsEngine droolsEngine; + private volatile boolean isAlive = true; + + public DMaaPAlarmPolling(Subscriber subscriber, DroolsEngine droolsEngine) { + this.subscriber = subscriber; + this.droolsEngine = droolsEngine; + } + + public void run() { + while (isAlive) { + List vesAlarmList = new ArrayList<>(); + try { + vesAlarmList = subscriber.subscribe(); + } catch (CorrelationException e) { + log.error("Failed polling request alarm." + e.getMessage()); + } + vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); + } + } + + public void stopTask() { + isAlive = false; + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java new file mode 100644 index 0000000..8d80a6b --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaap/SubscriberAction.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dmaap; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.manager.DroolsEngine; + +@Service +public class SubscriberAction { + + @Inject + private DroolsEngine droolsEngine; + private HashMap pollingTasks = new HashMap<>(); + + public synchronized void addSubscriber(Subscriber subscriber) { + if (!pollingTasks.containsKey(subscriber.getTopic())) { + DMaaPAlarmPolling pollingTask = new DMaaPAlarmPolling(subscriber, droolsEngine); + Thread thread = new Thread(pollingTask); + thread.start(); + pollingTasks.put(subscriber.getTopic(), pollingTask); + } + } + + public synchronized void removeSubscriber(Subscriber subscriber) { + if (pollingTasks.containsKey(subscriber.getTopic())) { + pollingTasks.get(subscriber.getTopic()).stopTask(); + pollingTasks.remove(subscriber.getTopic()); + } + } + + @PreDestroy + public void stopPollingTasks() { + Iterator iterator = pollingTasks.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = (Map.Entry)iterator.next(); + String key = (String) entry.getKey(); + pollingTasks.get(key).stopTask(); + } + pollingTasks.clear(); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java deleted file mode 100644 index 24e0817..0000000 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/DMaaPPollingRequest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.dmaappolling; - -import java.util.ArrayList; -import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.onap.holmes.common.api.stat.VesAlarm; -import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.manager.DroolsEngine; - -@Slf4j -public class DMaaPPollingRequest implements Runnable { - - private Subscriber subscriber; - - private DroolsEngine droolsEngine; - - public DMaaPPollingRequest(Subscriber subscriber, DroolsEngine droolsEngine) { - this.subscriber = subscriber; - this.droolsEngine = droolsEngine; - } - - public void run() { - List vesAlarmList = new ArrayList<>(); - try { - vesAlarmList = subscriber.subscribe(); - } catch (CorrelationException e) { - log.error("Failed polling request alarm." + e.getMessage()); - } - vesAlarmList.forEach(vesAlarm -> droolsEngine.putRaisedIntoStream(vesAlarm)); - } -} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java deleted file mode 100644 index da83683..0000000 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.engine.dmaappolling; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import javax.inject.Inject; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.manager.DroolsEngine; - -@Service -public class SubscriberAction { - - @Inject - private DroolsEngine droolsEngine; - - private ConcurrentHashMap pollingRequests = new ConcurrentHashMap(); - private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - - public void addSubscriber(Subscriber subscriber) { - if (!pollingRequests.containsKey(subscriber.getTopic())) { - DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); - ScheduledFuture future = service - .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); - pollingRequests.put(subscriber.getTopic(), future); - } - } - - public void removeSubscriber(Subscriber subscriber) { - ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); - if (future != null) { - future.cancel(true); - } - pollingRequests.remove(subscriber.getTopic()); - } -} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java index 9fa1874..99567b8 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/resources/DmaapConfigurationService.java @@ -16,7 +16,6 @@ package org.onap.holmes.engine.resources; -import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import javax.servlet.http.HttpServletRequest; @@ -33,7 +32,7 @@ import org.onap.holmes.common.dcae.DcaeConfigurationsCache; import org.onap.holmes.common.dcae.entity.SecurityInfo; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.dsa.dmaappolling.Subscriber; -import org.onap.holmes.engine.dmaappolling.SubscriberAction; +import org.onap.holmes.engine.dmaap.SubscriberAction; import org.onap.holmes.engine.request.DmaapConfigRequest; @Service -- cgit 1.2.3-korg