diff options
author | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-10-14 10:45:09 +0800 |
---|---|---|
committer | Shiwei Tian <tian.shiwei@zte.com.cn> | 2017-10-14 10:56:39 +0800 |
commit | e540d3894d2f662a843e5c51535d44b876b9889f (patch) | |
tree | 469d7d56958242ad2774437ae3ea9d360a5b6475 /engine-d/src/main | |
parent | 64fcc24209acc0c1d8097e2d10bd9cdef9c2b37c (diff) |
modify not publish messages to DMaaP
Issue-ID: HOLMES-71
Change-Id: Iaf87f0250d043aae84e9afc6aaec0f6cdc0a529c
Signed-off-by: Shiwei Tian <tian.shiwei@zte.com.cn>
Diffstat (limited to 'engine-d/src/main')
5 files changed, 86 insertions, 7 deletions
diff --git a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java index 5bb3dbe..69e4c7a 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/EngineDActiveApp.java @@ -20,11 +20,15 @@ import static jdk.nashorn.internal.runtime.regexp.joni.Config.log; import io.dropwizard.setup.Environment; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.onap.holmes.common.config.MicroServiceConfig; import org.onap.holmes.common.dropwizard.ioc.bundle.IOCApplication; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.MSBRegisterUtil; +import org.onap.holmes.engine.dcaepolling.DcaeConfigurationPolling; import org.onap.holmes.engine.resources.EngineResources; import org.onap.msb.sdk.discovery.entity.MicroServiceInfo; import org.onap.msb.sdk.discovery.entity.Node; @@ -40,6 +44,11 @@ public class EngineDActiveApp extends IOCApplication<EngineDAppConfig> { super.run(configuration, environment); environment.jersey().register(new EngineResources()); + + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + service.scheduleAtFixedRate(new DcaeConfigurationPolling("holmes-rule-mgmt"), 0, + DcaeConfigurationPolling.POLLING_PERIOD, TimeUnit.MILLISECONDS); + try { new MSBRegisterUtil().register2Msb(createMicroServiceInfo()); } catch (CorrelationException e) { diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java b/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java new file mode 100644 index 0000000..23030d2 --- /dev/null +++ b/engine-d/src/main/java/org/onap/holmes/engine/dcaepolling/DcaeConfigurationPolling.java @@ -0,0 +1,63 @@ +/** + * Copyright 2017 ZTE Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.engine.dcaepolling; + +import lombok.extern.slf4j.Slf4j; +import org.onap.holmes.common.dcae.DcaeConfigurationQuery; +import org.onap.holmes.common.dcae.DcaeConfigurationsCache; +import org.onap.holmes.common.dcae.entity.DcaeConfigurations; +import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.dsa.dmaappolling.Subscriber; +import org.onap.holmes.engine.dmaappolling.SubscriberAction; + +@Slf4j +public class DcaeConfigurationPolling implements Runnable{ + + private String hostname; + + private String subscriberKey = "sec_fault_unsecure"; + + public static long POLLING_PERIOD = 10 * 1000L; + + public DcaeConfigurationPolling(String hostname) { + this.hostname = hostname; + } + + @Override + public void run() { + DcaeConfigurations dcaeConfigurations = null; + try { + dcaeConfigurations = DcaeConfigurationQuery + .getDcaeConfigurations(hostname); + } catch (CorrelationException e) { + log.error("Failed to polling dcae configurations" + e.getMessage()); + } + if (dcaeConfigurations != null) { + DcaeConfigurationsCache.setDcaeConfigurations(dcaeConfigurations); + addSubscriber(dcaeConfigurations); + } + } + + private void addSubscriber(DcaeConfigurations dcaeConfigurations) { + SubscriberAction subscriberAction = ServiceLocatorHolder.getLocator() + .getService(SubscriberAction.class); + Subscriber subscriber = new Subscriber(); + subscriber.setUrl(dcaeConfigurations.getSubSecInfo(subscriberKey).getDmaapInfo() + .getTopicUrl()); + subscriberAction.addSubscriber(subscriber); + } +} diff --git a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java index 1e71899..ef585d5 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/dmaappolling/SubscriberAction.java @@ -35,17 +35,19 @@ public class SubscriberAction { private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); public void addSubscriber(Subscriber subscriber) { - DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); - ScheduledFuture future = service - .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); - pollingRequests.put(subscriber.getTopic(), future); + if (!pollingRequests.containsKey(subscriber.getUrl())) { + DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine); + ScheduledFuture future = service + .scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS); + pollingRequests.put(subscriber.getUrl(), future); + } } public void removeSubscriber(Subscriber subscriber) { - ScheduledFuture future = pollingRequests.get(subscriber.getTopic()); + ScheduledFuture future = pollingRequests.get(subscriber.getUrl()); if (future != null) { future.cancel(true); } - pollingRequests.remove(subscriber.getTopic()); + pollingRequests.remove(subscriber.getUrl()); } } diff --git a/engine-d/src/main/java/org/onap/holmes/engine/request/DeployRuleRequest.java b/engine-d/src/main/java/org/onap/holmes/engine/request/DeployRuleRequest.java index de9d773..5b8951b 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/request/DeployRuleRequest.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/request/DeployRuleRequest.java @@ -30,4 +30,7 @@ public class DeployRuleRequest { @JsonProperty(value = "engineid") private String engineId; + + @JsonProperty(value = "loopcontrolname") + private String loopControlName; } diff --git a/engine-d/src/main/java/org/onap/holmes/engine/resources/EngineResources.java b/engine-d/src/main/java/org/onap/holmes/engine/resources/EngineResources.java index 8f9a271..45754e2 100644 --- a/engine-d/src/main/java/org/onap/holmes/engine/resources/EngineResources.java +++ b/engine-d/src/main/java/org/onap/holmes/engine/resources/EngineResources.java @@ -33,6 +33,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import lombok.extern.slf4j.Slf4j; import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.dmaap.DmaapService; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.ExceptionUtil; import org.onap.holmes.common.utils.LanguageUtil; @@ -65,8 +66,9 @@ public class EngineResources { CorrelationRuleResponse crResponse = new CorrelationRuleResponse(); Locale locale = LanguageUtil.getLocale(httpRequest); try { - String packageName = droolsEngine.deployRule(deployRuleRequest, locale); + DmaapService.loopControlNames + .put(packageName, deployRuleRequest.getLoopControlName()); log.info("Rule deployed. Package name: " + packageName); crResponse.setPackageName(packageName); |