diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | dpo/blueprints/k8s-sonhms-inputs.yaml | 2 | ||||
-rw-r--r-- | pom.xml | 38 | ||||
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/Application.java | 7 | ||||
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java | 81 |
5 files changed, 103 insertions, 26 deletions
@@ -5,3 +5,4 @@ target/ .classpath *.jar /bin/ +.checkstyle diff --git a/dpo/blueprints/k8s-sonhms-inputs.yaml b/dpo/blueprints/k8s-sonhms-inputs.yaml index 5659e07..0efdc1f 100644 --- a/dpo/blueprints/k8s-sonhms-inputs.yaml +++ b/dpo/blueprints/k8s-sonhms-inputs.yaml @@ -18,7 +18,7 @@ pgaas_cluster_name: dcae-pg-primary.onap database_name: sonhms -tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.0 +tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.1 replicas: 1 aaf_username: aaf_password: @@ -175,10 +175,40 @@ </dependency> <dependency> - <groupId>org.functionaljava</groupId> - <artifactId>functionaljava</artifactId> - <version>3.0</version> - </dependency> + <groupId>org.functionaljava</groupId> + <artifactId>functionaljava</artifactId> + <version>3.0</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + <version>5.0.9.RELEASE</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-expression</artifactId> + <version>5.0.9.RELEASE</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-web</artifactId> + <version>5.0.9.RELEASE</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-webmvc</artifactId> + <version>5.0.9.RELEASE</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat.embed</groupId> + <artifactId>tomcat-embed-core</artifactId> + <version>9.0.14</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.7</version> + </dependency> </dependencies> diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java index 22f458a..9919bed 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java @@ -21,6 +21,8 @@ package org.onap.dcaegen2.services.sonhms; +import java.time.Duration; + import javax.sql.DataSource; import org.onap.dcaegen2.services.sonhms.controller.ConfigFetchFromCbs; @@ -41,8 +43,9 @@ public class Application { */ public static void main(String[] args) { - ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(); - configFetchFromCbs.getAppConfig(); + ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60)); + Thread configFetchThread = new Thread(configFetchFromCbs); + configFetchThread.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java index c2e7b63..afa26d8 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java @@ -27,6 +27,7 @@ import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConfigFetchFromCbs { +import reactor.core.Disposable; + +public class ConfigFetchFromCbs implements Runnable { private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class); + private Duration interval; + + public ConfigFetchFromCbs() { + + } + + public ConfigFetchFromCbs(Duration interval) { + this.interval = interval; + } + /** * Gets app config from CBS. */ - public void getAppConfig() { + private Disposable getAppConfig() { // Generate RequestID and InvocationID which will be used when logging and in // HTTP requests @@ -58,23 +71,36 @@ public class ConfigFetchFromCbs { log.debug("environments {}", env); ConfigPolicy configPolicy = ConfigPolicy.getInstance(); + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + // Create the client and use it to get the configuration final CbsRequest request = CbsRequests.getAll(diagnosticContext); - CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> { - log.info("configuration and policy from CBS {}", jsonObject); - JsonObject config = jsonObject.getAsJsonObject("config"); - - updateConfigurationFromJsonObject(config); - - Type mapType = new TypeToken<Map<String, Object>>() { - }.getType(); - JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0) - .getAsJsonObject().getAsJsonObject("config"); - Map<String, Object> policy = new Gson().fromJson(policyJson, mapType); - configPolicy.setConfig(policy); - log.info("Config policy {}", configPolicy); - }, throwable -> log.warn("Ooops", throwable)); - + return CbsClientFactory.createCbsClient(env) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> { + log.info("configuration and policy from CBS {}", jsonObject); + JsonObject config = jsonObject.getAsJsonObject("config"); + Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt()); + if (!newPeriod.equals(period)) { + interval = newPeriod; + synchronized (this) { + this.notifyAll(); + } + + } + updateConfigurationFromJsonObject(config); + + Type mapType = new TypeToken<Map<String, Object>>() { + }.getType(); + if (jsonObject.getAsJsonObject("policies") != null) { + JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0) + .getAsJsonObject().getAsJsonObject("config"); + Map<String, Object> policy = new Gson().fromJson(policyJson, mapType); + configPolicy.setConfig(policy); + log.info("Config policy {}", configPolicy); + } + }, throwable -> log.warn("Ooops", throwable)); } private void updateConfigurationFromJsonObject(JsonObject jsonObject) { @@ -160,9 +186,26 @@ public class ConfigFetchFromCbs { configuration.setOofTriggerCountTimer(oofTriggerCountTimer); configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold); configuration.setPolicyRespTimer(policyRespTimer); - - log.info("configuration from CBS {}", configuration.toString()); + log.info("configuration from CBS {}", configuration); + + } + + @Override + public void run() { + Boolean done = false; + while (!done) { + try { + Disposable disp = getAppConfig(); + synchronized (this) { + this.wait(); + } + log.info("Polling interval changed"); + disp.dispose(); + } catch (Exception e) { + done = true; + } + } } } |