aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java50
1 files changed, 27 insertions, 23 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index b66be163..8e15deb7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -31,8 +32,10 @@ import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
@@ -46,6 +49,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -77,7 +81,7 @@ public class AppConfig {
@Autowired
public synchronized void setCloudConfigurationProvider(
- CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
@@ -94,10 +98,10 @@ public class AppConfig {
loadConfigurationFromFile();
refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
- .flatMap(count -> createRefreshConfigurationTask(count, context))
- .subscribe(e -> logger.info("Refreshed configuration data"),
- throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
- () -> logger.error("Configuration refresh terminated"));
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
}
public void stop() {
@@ -116,7 +120,7 @@ public class AppConfig {
}
public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
- throws DatafileTaskException {
+ throws DatafileTaskException {
if (publishingConfiguration == null) {
throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
@@ -124,7 +128,7 @@ public class AppConfig {
PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
if (cfg == null) {
throw new DatafileTaskException(
- "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
}
return cfg;
}
@@ -135,14 +139,14 @@ public class AppConfig {
Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
return Flux.just(counter) //
- .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
- .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
- .flatMap(this::fetchConfiguration);
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
}
Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
- .onErrorResume(AppConfig::onErrorResume);
+ .onErrorResume(AppConfig::onErrorResume);
}
private static <R> Mono<R> onErrorResume(Throwable trowable) {
@@ -152,21 +156,21 @@ public class AppConfig {
private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
- .onErrorResume(AppConfig::onErrorResume);
+ .onErrorResume(AppConfig::onErrorResume);
// Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
// other ones does not work
EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
- .consulHost(env.consulHost()) //
- .consulPort(env.consulPort()) //
- .cbsName(env.cbsName()) //
- .appName(env.appName() + ":dmaap") //
- .build(); //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
- .onErrorResume(t -> Mono.just(new JsonObject()));
+ .onErrorResume(t -> Mono.just(new JsonObject()));
return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
- .onErrorResume(AppConfig::onErrorResume);
+ .onErrorResume(AppConfig::onErrorResume);
}
/**
@@ -181,7 +185,7 @@ public class AppConfig {
try {
CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
- parser.getFtpesConfig());
+ parser.getFtpesConfig());
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
@@ -208,11 +212,11 @@ public class AppConfig {
}
private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
logger.error(
- "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfiguration, ftpesConfig);
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
} else {
this.dmaapConsumerConfiguration = consumerConfiguration;
this.publishingConfiguration = publisherConfiguration;