diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java | 98 |
1 files changed, 43 insertions, 55 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e9d84640..6e9f7702 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -31,21 +32,26 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,22 +71,15 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); private ConsumerConfiguration dmaapConsumerConfiguration; - Map<String, PublisherConfiguration> publishingConfigurations; + private Map<String, PublisherConfiguration> publishingConfigurations; private FtpesConfig ftpesConfiguration; - private CloudConfigurationProvider cloudConfigurationProvider; @Value("#{systemEnvironment}") Properties systemEnvironment; - Disposable refreshConfigTask = null; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - @Autowired - public synchronized void setCloudConfigurationProvider( - CloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - public synchronized void setFilepath(String filepath) { this.filepath = filepath; } @@ -93,13 +92,25 @@ public class AppConfig { Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); loadConfigurationFromFile(); - refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) - .flatMap(count -> createRefreshConfigurationTask(count, context)) + refreshConfigTask = createRefreshTask(context) // .subscribe(e -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } + Flux<AppConfig> createRefreshTask(Map<String, String> context) { + return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient) + .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig) + .onErrorResume(this::onErrorResume); + } + + private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) { + final Duration initialDelay = Duration.ZERO; + final Duration refreshPeriod = Duration.ofMinutes(1); + final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); + return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); + } + /** * Stops the refreshing of the configuration. */ @@ -152,55 +163,31 @@ public class AppConfig { return ftpesConfiguration; } - Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { - return Flux.just(counter) // - .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // - .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // - .flatMap(this::fetchConfiguration); - } - - Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) - .onErrorResume(AppConfig::onErrorResume); - } - - private static <R> Mono<R> onErrorResume(Throwable trowable) { + private <R> Mono<R> onErrorResume(Throwable trowable) { logger.error("Could not refresh application configuration {}", trowable.toString()); return Mono.empty(); } - private Mono<AppConfig> fetchConfiguration(EnvProperties env) { - Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // - .onErrorResume(AppConfig::onErrorResume); - - // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the - // other ones does not work - EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // - .consulHost(env.consulHost()) // - .consulPort(env.consulPort()) // - .cbsName(env.cbsName()) // - .appName(env.appName() + ":dmaap") // - .build(); // - Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) - .onErrorResume(t -> Mono.just(new JsonObject())); + Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context); + } - return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // - .onErrorResume(AppConfig::onErrorResume); + Mono<CbsClient> createCbsClient(EnvProperties env) { + return CbsClientFactory.createCbsClient(env); } /** * Parse configuration. * - * @param serviceConfigRootObject the DFC service's configuration - * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the - * serviceConfigRootObject + * @param jsonObject the DFC service's configuration * @return this which is updated if successful */ - private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + private AppConfig parseCloudConfig(JsonObject jsonObject) { try { - CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + CloudConfigParser parser = new CloudConfigParser(jsonObject); setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig()); + } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -220,20 +207,21 @@ public class AppConfig { if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } - parseCloudConfig(rootObject, rootObject); + parseCloudConfig(rootObject); } catch (JsonSyntaxException | IOException e) { logger.warn("Local configuration file not loaded: {}", filepath, e); } } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) { - if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) { - logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfigurations, ftpesConfig); + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; - this.publishingConfigurations = publisherConfigurations; + this.publishingConfigurations = publisherConfiguration; this.ftpesConfiguration = ftpesConfig; } } |