aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java98
1 files changed, 43 insertions, 55 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e9d84640..6e9f7702 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -31,21 +32,26 @@ import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -65,22 +71,15 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
private ConsumerConfiguration dmaapConsumerConfiguration;
- Map<String, PublisherConfiguration> publishingConfigurations;
+ private Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
- private CloudConfigurationProvider cloudConfigurationProvider;
@Value("#{systemEnvironment}")
Properties systemEnvironment;
- Disposable refreshConfigTask = null;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- @Autowired
- public synchronized void setCloudConfigurationProvider(
- CloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
@@ -93,13 +92,25 @@ public class AppConfig {
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
loadConfigurationFromFile();
- refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
- .flatMap(count -> createRefreshConfigurationTask(count, context))
+ refreshConfigTask = createRefreshTask(context) //
.subscribe(e -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
+ Flux<AppConfig> createRefreshTask(Map<String, String> context) {
+ return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient)
+ .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig)
+ .onErrorResume(this::onErrorResume);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
/**
* Stops the refreshing of the configuration.
*/
@@ -152,55 +163,31 @@ public class AppConfig {
return ftpesConfiguration;
}
- Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
- return Flux.just(counter) //
- .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
- .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
- .flatMap(this::fetchConfiguration);
- }
-
- Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
- .onErrorResume(AppConfig::onErrorResume);
- }
-
- private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
logger.error("Could not refresh application configuration {}", trowable.toString());
return Mono.empty();
}
- private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
- Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
- .onErrorResume(AppConfig::onErrorResume);
-
- // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
- // other ones does not work
- EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
- .consulHost(env.consulHost()) //
- .consulPort(env.consulPort()) //
- .cbsName(env.cbsName()) //
- .appName(env.appName() + ":dmaap") //
- .build(); //
- Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
- .onErrorResume(t -> Mono.just(new JsonObject()));
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+ }
- return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
- .onErrorResume(AppConfig::onErrorResume);
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
}
/**
* Parse configuration.
*
- * @param serviceConfigRootObject the DFC service's configuration
- * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the
- * serviceConfigRootObject
+ * @param jsonObject the DFC service's configuration
* @return this which is updated if successful
*/
- private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ private AppConfig parseCloudConfig(JsonObject jsonObject) {
try {
- CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ CloudConfigParser parser = new CloudConfigParser(jsonObject);
setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
parser.getFtpesConfig());
+
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
@@ -220,20 +207,21 @@ public class AppConfig {
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
- parseCloudConfig(rootObject, rootObject);
+ parseCloudConfig(rootObject);
} catch (JsonSyntaxException | IOException e) {
logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
- if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
- logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfigurations, ftpesConfig);
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
} else {
this.dmaapConsumerConfiguration = consumerConfiguration;
- this.publishingConfigurations = publisherConfigurations;
+ this.publishingConfigurations = publisherConfiguration;
this.ftpesConfiguration = ftpesConfig;
}
}