summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorkrishna <krishna.moorthy6@wipro.com>2019-08-12 16:24:53 +0530
committerkrishna <krishna.moorthy6@wipro.com>2019-08-19 11:29:52 +0530
commit047720a551fdfcc6713da79759bce4bbad45f1e0 (patch)
treea08588d61c901af34c2abe642b611d3e695614be /src/main
parentae9e6686a97bfcefb429fefbe998a49af5fbc71c (diff)
Update CBS client to fetch config periodically
Change-Id: I29c5880072631dcd4b6425452316724fcfd7bd13 Issue-ID: DCAEGEN2-1642 Signed-off-by: krishna <krishna.moorthy6@wipro.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/Application.java7
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java81
2 files changed, 67 insertions, 21 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
index 22f458a..9919bed 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
@@ -21,6 +21,8 @@
package org.onap.dcaegen2.services.sonhms;
+import java.time.Duration;
+
import javax.sql.DataSource;
import org.onap.dcaegen2.services.sonhms.controller.ConfigFetchFromCbs;
@@ -41,8 +43,9 @@ public class Application {
*/
public static void main(String[] args) {
- ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs();
- configFetchFromCbs.getAppConfig();
+ ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60));
+ Thread configFetchThread = new Thread(configFetchFromCbs);
+ configFetchThread.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
index c2e7b63..afa26d8 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConfigFetchFromCbs {
+import reactor.core.Disposable;
+
+public class ConfigFetchFromCbs implements Runnable {
private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
+ private Duration interval;
+
+ public ConfigFetchFromCbs() {
+
+ }
+
+ public ConfigFetchFromCbs(Duration interval) {
+ this.interval = interval;
+ }
+
/**
* Gets app config from CBS.
*/
- public void getAppConfig() {
+ private Disposable getAppConfig() {
// Generate RequestID and InvocationID which will be used when logging and in
// HTTP requests
@@ -58,23 +71,36 @@ public class ConfigFetchFromCbs {
log.debug("environments {}", env);
ConfigPolicy configPolicy = ConfigPolicy.getInstance();
+ // Polling properties
+ final Duration initialDelay = Duration.ofSeconds(5);
+ final Duration period = interval;
+
// Create the client and use it to get the configuration
final CbsRequest request = CbsRequests.getAll(diagnosticContext);
- CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> {
- log.info("configuration and policy from CBS {}", jsonObject);
- JsonObject config = jsonObject.getAsJsonObject("config");
-
- updateConfigurationFromJsonObject(config);
-
- Type mapType = new TypeToken<Map<String, Object>>() {
- }.getType();
- JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
- .getAsJsonObject().getAsJsonObject("config");
- Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
- configPolicy.setConfig(policy);
- log.info("Config policy {}", configPolicy);
- }, throwable -> log.warn("Ooops", throwable));
-
+ return CbsClientFactory.createCbsClient(env)
+ .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
+ log.info("configuration and policy from CBS {}", jsonObject);
+ JsonObject config = jsonObject.getAsJsonObject("config");
+ Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
+ if (!newPeriod.equals(period)) {
+ interval = newPeriod;
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ }
+ updateConfigurationFromJsonObject(config);
+
+ Type mapType = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ if (jsonObject.getAsJsonObject("policies") != null) {
+ JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
+ .getAsJsonObject().getAsJsonObject("config");
+ Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
+ configPolicy.setConfig(policy);
+ log.info("Config policy {}", configPolicy);
+ }
+ }, throwable -> log.warn("Ooops", throwable));
}
private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
@@ -160,9 +186,26 @@ public class ConfigFetchFromCbs {
configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
configuration.setPolicyRespTimer(policyRespTimer);
-
- log.info("configuration from CBS {}", configuration.toString());
+ log.info("configuration from CBS {}", configuration);
+
+ }
+
+ @Override
+ public void run() {
+ Boolean done = false;
+ while (!done) {
+ try {
+ Disposable disp = getAppConfig();
+ synchronized (this) {
+ this.wait();
+ }
+ log.info("Polling interval changed");
+ disp.dispose();
+ } catch (Exception e) {
+ done = true;
+ }
+ }
}
}