From 047720a551fdfcc6713da79759bce4bbad45f1e0 Mon Sep 17 00:00:00 2001 From: krishna Date: Mon, 12 Aug 2019 16:24:53 +0530 Subject: Update CBS client to fetch config periodically Change-Id: I29c5880072631dcd4b6425452316724fcfd7bd13 Issue-ID: DCAEGEN2-1642 Signed-off-by: krishna --- .../onap/dcaegen2/services/sonhms/Application.java | 7 +- .../sonhms/controller/ConfigFetchFromCbs.java | 81 +++++++++++++++++----- 2 files changed, 67 insertions(+), 21 deletions(-) (limited to 'src/main/java/org') diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java index 22f458a..9919bed 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java @@ -21,6 +21,8 @@ package org.onap.dcaegen2.services.sonhms; +import java.time.Duration; + import javax.sql.DataSource; import org.onap.dcaegen2.services.sonhms.controller.ConfigFetchFromCbs; @@ -41,8 +43,9 @@ public class Application { */ public static void main(String[] args) { - ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(); - configFetchFromCbs.getAppConfig(); + ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60)); + Thread configFetchThread = new Thread(configFetchFromCbs); + configFetchThread.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java index c2e7b63..afa26d8 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java @@ -27,6 +27,7 @@ import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConfigFetchFromCbs { +import reactor.core.Disposable; + +public class ConfigFetchFromCbs implements Runnable { private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class); + private Duration interval; + + public ConfigFetchFromCbs() { + + } + + public ConfigFetchFromCbs(Duration interval) { + this.interval = interval; + } + /** * Gets app config from CBS. */ - public void getAppConfig() { + private Disposable getAppConfig() { // Generate RequestID and InvocationID which will be used when logging and in // HTTP requests @@ -58,23 +71,36 @@ public class ConfigFetchFromCbs { log.debug("environments {}", env); ConfigPolicy configPolicy = ConfigPolicy.getInstance(); + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + // Create the client and use it to get the configuration final CbsRequest request = CbsRequests.getAll(diagnosticContext); - CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> { - log.info("configuration and policy from CBS {}", jsonObject); - JsonObject config = jsonObject.getAsJsonObject("config"); - - updateConfigurationFromJsonObject(config); - - Type mapType = new TypeToken>() { - }.getType(); - JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0) - .getAsJsonObject().getAsJsonObject("config"); - Map policy = new Gson().fromJson(policyJson, mapType); - configPolicy.setConfig(policy); - log.info("Config policy {}", configPolicy); - }, throwable -> log.warn("Ooops", throwable)); - + return CbsClientFactory.createCbsClient(env) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> { + log.info("configuration and policy from CBS {}", jsonObject); + JsonObject config = jsonObject.getAsJsonObject("config"); + Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt()); + if (!newPeriod.equals(period)) { + interval = newPeriod; + synchronized (this) { + this.notifyAll(); + } + + } + updateConfigurationFromJsonObject(config); + + Type mapType = new TypeToken>() { + }.getType(); + if (jsonObject.getAsJsonObject("policies") != null) { + JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0) + .getAsJsonObject().getAsJsonObject("config"); + Map policy = new Gson().fromJson(policyJson, mapType); + configPolicy.setConfig(policy); + log.info("Config policy {}", configPolicy); + } + }, throwable -> log.warn("Ooops", throwable)); } private void updateConfigurationFromJsonObject(JsonObject jsonObject) { @@ -160,9 +186,26 @@ public class ConfigFetchFromCbs { configuration.setOofTriggerCountTimer(oofTriggerCountTimer); configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold); configuration.setPolicyRespTimer(policyRespTimer); - - log.info("configuration from CBS {}", configuration.toString()); + log.info("configuration from CBS {}", configuration); + + } + + @Override + public void run() { + Boolean done = false; + while (!done) { + try { + Disposable disp = getAppConfig(); + synchronized (this) { + this.wait(); + } + log.info("Polling interval changed"); + disp.dispose(); + } catch (Exception e) { + done = true; + } + } } } -- cgit 1.2.3-korg