diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java | 50 |
1 files changed, 27 insertions, 23 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index b66be163..8e15deb7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -31,8 +32,10 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; @@ -46,6 +49,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -77,7 +81,7 @@ public class AppConfig { @Autowired public synchronized void setCloudConfigurationProvider( - CloudConfigurationProvider reactiveCloudConfigurationProvider) { + CloudConfigurationProvider reactiveCloudConfigurationProvider) { this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; } @@ -94,10 +98,10 @@ public class AppConfig { loadConfigurationFromFile(); refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) - .flatMap(count -> createRefreshConfigurationTask(count, context)) - .subscribe(e -> logger.info("Refreshed configuration data"), - throwable -> logger.error("Configuration refresh terminated due to exception", throwable), - () -> logger.error("Configuration refresh terminated")); + .flatMap(count -> createRefreshConfigurationTask(count, context)) + .subscribe(e -> logger.info("Refreshed configuration data"), + throwable -> logger.error("Configuration refresh terminated due to exception", throwable), + () -> logger.error("Configuration refresh terminated")); } public void stop() { @@ -116,7 +120,7 @@ public class AppConfig { } public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier) - throws DatafileTaskException { + throws DatafileTaskException { if (publishingConfiguration == null) { throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier); @@ -124,7 +128,7 @@ public class AppConfig { PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier); if (cfg == null) { throw new DatafileTaskException( - "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); + "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier); } return cfg; } @@ -135,14 +139,14 @@ public class AppConfig { Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { return Flux.just(counter) // - .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // - .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // - .flatMap(this::fetchConfiguration); + .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // + .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // + .flatMap(this::fetchConfiguration); } Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); } private static <R> Mono<R> onErrorResume(Throwable trowable) { @@ -152,21 +156,21 @@ public class AppConfig { private Mono<AppConfig> fetchConfiguration(EnvProperties env) { Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the // other ones does not work EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // - .consulHost(env.consulHost()) // - .consulPort(env.consulPort()) // - .cbsName(env.cbsName()) // - .appName(env.appName() + ":dmaap") // - .build(); // + .consulHost(env.consulHost()) // + .consulPort(env.consulPort()) // + .cbsName(env.cbsName()) // + .appName(env.appName() + ":dmaap") // + .build(); // Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) - .onErrorResume(t -> Mono.just(new JsonObject())); + .onErrorResume(t -> Mono.just(new JsonObject())); return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // - .onErrorResume(AppConfig::onErrorResume); + .onErrorResume(AppConfig::onErrorResume); } /** @@ -181,7 +185,7 @@ public class AppConfig { try { CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(), - parser.getFtpesConfig()); + parser.getFtpesConfig()); } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -208,11 +212,11 @@ public class AppConfig { } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { logger.error( - "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfiguration, ftpesConfig); + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; this.publishingConfiguration = publisherConfiguration; |