aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--dpo/blueprints/k8s-sonhms-inputs.yaml2
-rw-r--r--pom.xml38
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/Application.java7
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java81
5 files changed, 103 insertions, 26 deletions
diff --git a/.gitignore b/.gitignore
index 26bb426..14ef475 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ target/
.classpath
*.jar
/bin/
+.checkstyle
diff --git a/dpo/blueprints/k8s-sonhms-inputs.yaml b/dpo/blueprints/k8s-sonhms-inputs.yaml
index 5659e07..0efdc1f 100644
--- a/dpo/blueprints/k8s-sonhms-inputs.yaml
+++ b/dpo/blueprints/k8s-sonhms-inputs.yaml
@@ -18,7 +18,7 @@
pgaas_cluster_name: dcae-pg-primary.onap
database_name: sonhms
-tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.0
+tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.son-handler:1.1.1
replicas: 1
aaf_username:
aaf_password:
diff --git a/pom.xml b/pom.xml
index 14f8dff..ef1c771 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,10 +175,40 @@
</dependency>
<dependency>
- <groupId>org.functionaljava</groupId>
- <artifactId>functionaljava</artifactId>
- <version>3.0</version>
- </dependency>
+ <groupId>org.functionaljava</groupId>
+ <artifactId>functionaljava</artifactId>
+ <version>3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-expression</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webmvc</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tomcat.embed</groupId>
+ <artifactId>tomcat-embed-core</artifactId>
+ <version>9.0.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.7</version>
+ </dependency>
</dependencies>
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
index 22f458a..9919bed 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/Application.java
@@ -21,6 +21,8 @@
package org.onap.dcaegen2.services.sonhms;
+import java.time.Duration;
+
import javax.sql.DataSource;
import org.onap.dcaegen2.services.sonhms.controller.ConfigFetchFromCbs;
@@ -41,8 +43,9 @@ public class Application {
*/
public static void main(String[] args) {
- ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs();
- configFetchFromCbs.getAppConfig();
+ ConfigFetchFromCbs configFetchFromCbs = new ConfigFetchFromCbs(Duration.ofSeconds(60));
+ Thread configFetchThread = new Thread(configFetchFromCbs);
+ configFetchThread.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
index c2e7b63..afa26d8 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/controller/ConfigFetchFromCbs.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -40,14 +41,26 @@ import org.onap.dcaegen2.services.sonhms.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConfigFetchFromCbs {
+import reactor.core.Disposable;
+
+public class ConfigFetchFromCbs implements Runnable {
private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
+ private Duration interval;
+
+ public ConfigFetchFromCbs() {
+
+ }
+
+ public ConfigFetchFromCbs(Duration interval) {
+ this.interval = interval;
+ }
+
/**
* Gets app config from CBS.
*/
- public void getAppConfig() {
+ private Disposable getAppConfig() {
// Generate RequestID and InvocationID which will be used when logging and in
// HTTP requests
@@ -58,23 +71,36 @@ public class ConfigFetchFromCbs {
log.debug("environments {}", env);
ConfigPolicy configPolicy = ConfigPolicy.getInstance();
+ // Polling properties
+ final Duration initialDelay = Duration.ofSeconds(5);
+ final Duration period = interval;
+
// Create the client and use it to get the configuration
final CbsRequest request = CbsRequests.getAll(diagnosticContext);
- CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> {
- log.info("configuration and policy from CBS {}", jsonObject);
- JsonObject config = jsonObject.getAsJsonObject("config");
-
- updateConfigurationFromJsonObject(config);
-
- Type mapType = new TypeToken<Map<String, Object>>() {
- }.getType();
- JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
- .getAsJsonObject().getAsJsonObject("config");
- Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
- configPolicy.setConfig(policy);
- log.info("Config policy {}", configPolicy);
- }, throwable -> log.warn("Ooops", throwable));
-
+ return CbsClientFactory.createCbsClient(env)
+ .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
+ log.info("configuration and policy from CBS {}", jsonObject);
+ JsonObject config = jsonObject.getAsJsonObject("config");
+ Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
+ if (!newPeriod.equals(period)) {
+ interval = newPeriod;
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ }
+ updateConfigurationFromJsonObject(config);
+
+ Type mapType = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ if (jsonObject.getAsJsonObject("policies") != null) {
+ JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
+ .getAsJsonObject().getAsJsonObject("config");
+ Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
+ configPolicy.setConfig(policy);
+ log.info("Config policy {}", configPolicy);
+ }
+ }, throwable -> log.warn("Ooops", throwable));
}
private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
@@ -160,9 +186,26 @@ public class ConfigFetchFromCbs {
configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
configuration.setPolicyRespTimer(policyRespTimer);
-
- log.info("configuration from CBS {}", configuration.toString());
+ log.info("configuration from CBS {}", configuration);
+
+ }
+
+ @Override
+ public void run() {
+ Boolean done = false;
+ while (!done) {
+ try {
+ Disposable disp = getAppConfig();
+ synchronized (this) {
+ this.wait();
+ }
+ log.info("Polling interval changed");
+ disp.dispose();
+ } catch (Exception e) {
+ done = true;
+ }
+ }
}
}