From 1599617246f0ffec1b1c7840f9c7c42318183dcd Mon Sep 17 00:00:00 2001 From: Bogumil Zebek Date: Fri, 30 Oct 2020 10:23:45 +0100 Subject: Fetch configuration from CBS - Fix memory leak. - Add reactive configuration fetching from Consul. Now configuration is updated when any change in VES configuration has been done in Consul. Change-Id: I9cd42e04844c9e99d4d03951185523b569dc9483 Issue-ID: DCAEGEN2-2495 Signed-off-by: Zebek Bogumil --- .../dcae/ApplicationConfigurationListener.java | 105 ++++++++++++++++++++ src/main/java/org/onap/dcae/VesApplication.java | 50 +++++----- .../org/onap/dcae/configuration/ConfigLoader.java | 99 ------------------- .../dcae/configuration/ConfigLoaderFactory.java | 37 ------- .../org/onap/dcae/configuration/ConfigParsing.java | 6 +- .../org/onap/dcae/configuration/ConfigUpdater.java | 95 ++++++++++++++++++ .../dcae/configuration/ConfigUpdaterFactory.java | 36 +++++++ .../dcae/configuration/ConfigurationHandler.java | 109 +++++++++++++++++++++ .../cbs/CbsClientConfigurationProvider.java | 85 ++++++++++++++++ .../cbs/CbsClientConfigurationResolver.java | 60 ------------ .../dcae/configuration/cbs/CbsConfigResolver.java | 58 ----------- .../cbs/CbsConfigResolverFactory.java | 28 ------ 12 files changed, 456 insertions(+), 312 deletions(-) create mode 100644 src/main/java/org/onap/dcae/ApplicationConfigurationListener.java delete mode 100644 src/main/java/org/onap/dcae/configuration/ConfigLoader.java delete mode 100644 src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java create mode 100644 src/main/java/org/onap/dcae/configuration/ConfigUpdater.java create mode 100644 src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java create mode 100644 src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java create mode 100644 src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java delete mode 100644 src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java delete mode 100644 src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java delete mode 100644 src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java (limited to 'src/main/java/org/onap') diff --git a/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java new file mode 100644 index 00000000..b86bc1ec --- /dev/null +++ b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcae; + +import org.onap.dcae.configuration.ConfigurationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +import java.time.Duration; + +/** + * ApplicationConfigurationListener is used to listen at notifications with configuration updates send from Consul. + */ +public class ApplicationConfigurationListener implements Runnable { + + private static Logger log = LoggerFactory.getLogger(ApplicationConfigurationListener.class); + + private Duration interval; + private boolean terminate = false; + private final ConfigurationHandler configurationHandler; + + /** + * Constructor + * @param interval defines period of time when notification can come + * @param configurationHandler handles notifications send by Consul + */ + public ApplicationConfigurationListener(Duration interval, ConfigurationHandler configurationHandler) { + this.interval = interval; + this.configurationHandler = configurationHandler; + } + + /** + * Reload listener to start listening for Consul notifications with defined interval. + * @param interval defines period of time when notification can come + */ + public synchronized void reload(Duration interval) { + this.interval = interval; + log.info("Handler configuration was changed. Need to reload configuration handler."); + sendReloadAction(); + } + + synchronized void sendReloadAction() { + this.notifyAll(); + } + + /** + * Start listening for Consul notification. + */ + @Override + public void run() { + Disposable consulListener = null; + do { + try { + consulListener = listenForConfigurationUpdates(); + synchronized (this) { + log.info("Switch to configuration handler thread. Active waiting for configuration from Consul."); + this.wait(); + } + } catch (Exception e) { + log.error("Unexpected error occurred during handling data from Consul.", e); + terminate(); + } finally { + stopListeningForConfigurationUpdates(consulListener); + } + } while (!this.terminate); + } + + private Disposable listenForConfigurationUpdates() { + return this.configurationHandler.startListen(this.interval); + } + + void terminate() { + this.terminate = true; + } + + /** + * Release resources when there is a need to stop listener + * @param consulListener Handler to Consul listener + */ + void stopListeningForConfigurationUpdates(Disposable consulListener) { + if (consulListener != null) { + consulListener.dispose(); + } + } +} diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java index e5ee6e35..ec04157f 100644 --- a/src/main/java/org/onap/dcae/VesApplication.java +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -27,8 +27,10 @@ import org.onap.dcae.common.validator.StndDefinedValidatorResolver; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.onap.dcae.common.publishing.PublisherConfig; -import org.onap.dcae.configuration.ConfigLoader; -import org.onap.dcae.configuration.ConfigLoaderFactory; +import org.onap.dcae.configuration.ConfigurationHandler; +import org.onap.dcae.configuration.ConfigUpdater; +import org.onap.dcae.configuration.ConfigUpdaterFactory; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; import org.onap.dcaegen2.services.sdk.services.external.schema.manager.service.StndDefinedValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,37 +44,38 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import java.nio.file.Paths; -import java.util.concurrent.ScheduledFuture; +import java.time.Duration; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class VesApplication { + private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5; + private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input"); private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error"); private static ApplicationSettings applicationSettings; private static ConfigurableApplicationContext context; - private static ConfigLoader configLoader; - private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + private static ConfigUpdater configUpdater; private static DMaaPEventPublisher eventPublisher; - private static ScheduledFuture scheduleFeatures; + private static ApplicationConfigurationListener applicationConfigurationListener; public static void main(String[] args) { SpringApplication app = new SpringApplication(VesApplication.class); applicationSettings = new ApplicationSettings(args, CLIUtils::processCmdLine); - scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); init(); + + applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul(); + app.setAddCommandLineProperties(true); context = app.run(); - configLoader.updateConfig(); } public static void restartApplication() { Thread thread = new Thread(() -> { context.close(); applicationSettings.reloadProperties(); - scheduleFeatures.cancel(true); + applicationConfigurationListener.reload(Duration.ofMinutes(applicationSettings.configurationUpdateFrequency())); init(); context = SpringApplication.run(VesApplication.class); }); @@ -81,26 +84,20 @@ public class VesApplication { } private static void init() { - createConfigLoader(); - createSchedulePoolExecutor(); - createExecutors(); - } - - private static void createExecutors() { + configUpdater = ConfigUpdaterFactory.create( + applicationSettings.configurationFileLocation(), + Paths.get(applicationSettings.dMaaPConfigurationFileLocation())); eventPublisher = new DMaaPEventPublisher(getDmaapConfig()); } - private static void createSchedulePoolExecutor() { - scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, - applicationSettings.configurationUpdateFrequency(), - applicationSettings.configurationUpdateFrequency(), - TimeUnit.MINUTES); - } + private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() { + ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater); + ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler); - private static void createConfigLoader() { - configLoader = ConfigLoaderFactory.create( - applicationSettings.configurationFileLocation(), - Paths.get(applicationSettings.dMaaPConfigurationFileLocation())); + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + scheduledThreadPoolExecutor.execute(applicationConfigProvider); + + return applicationConfigProvider; } private static Map getDmaapConfig() { @@ -136,5 +133,4 @@ public class VesApplication { public StndDefinedValidator getStndDefinedValidator(StndDefinedValidatorResolver resolver) { return resolver.resolve(); } - } diff --git a/src/main/java/org/onap/dcae/configuration/ConfigLoader.java b/src/main/java/org/onap/dcae/configuration/ConfigLoader.java deleted file mode 100644 index eaeb09e3..00000000 --- a/src/main/java/org/onap/dcae/configuration/ConfigLoader.java +++ /dev/null @@ -1,99 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.configuration; - -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.json.JSONObject; -import org.onap.dcae.configuration.cbs.CbsConfigResolver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConfigLoader { - - private static final Logger log = LoggerFactory.getLogger(ConfigLoader.class); - private final ConfigFilesFacade configFilesFacade; - private final CbsConfigResolver cbsConfigResolver; - private final Runnable applicationRestarter; - private boolean isApplicationRestartNeeded; - - ConfigLoader(ConfigFilesFacade configFilesFacade, CbsConfigResolver cbsConfigResolver, Runnable applicationRestarter) { - this.configFilesFacade = configFilesFacade; - this.cbsConfigResolver = cbsConfigResolver; - this.applicationRestarter = applicationRestarter; - this.isApplicationRestartNeeded = false; - } - - public synchronized void updateConfig() { - Option appConfig = cbsConfigResolver.getAppConfig(); - appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage()); - } - - private Runnable logSkipMessage() { - return () -> log.info("Skipping dynamic configuration"); - } - - private void handleUpdate(JSONObject appConfig) { - updatePropertiesIfChanged(appConfig); - updateDmaapConfigIfChanged(appConfig); - restartApplicationIfNeeded(); - } - - private void updatePropertiesIfChanged(JSONObject appConfig) { - Map newProperties = ConfigParsing.getProperties(appConfig); - Map oldProperties = configFilesFacade.readCollectorProperties().get(); - - if (!areCommonPropertiesSame(oldProperties, newProperties)) { - configFilesFacade.writeProperties(newProperties); - isApplicationRestartNeeded = true; - } - } - - private boolean areCommonPropertiesSame(Map oldProperties, Map newProperties) { - Map filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties); - return filteredOldProperties.equals(newProperties); - } - - private Map filterIntersectingKeys(Map primaryProperties, - Map otherProperties) { - return primaryProperties.filterKeys(key -> containsKey(key, otherProperties)); - } - - private boolean containsKey(String key, Map properties) { - return properties.keySet().contains(key); - } - - private void updateDmaapConfigIfChanged(JSONObject appConfig) { - JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get(); - JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get(); - - if (!oldDmaapConfig.similar(newDmaapConfig)) { - configFilesFacade.writeDMaaPConfiguration(newDmaapConfig); - isApplicationRestartNeeded = true; - } - } - - private void restartApplicationIfNeeded() { - if (isApplicationRestartNeeded) { - applicationRestarter.run(); - isApplicationRestartNeeded = false; - } - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java deleted file mode 100644 index e5ea3c59..00000000 --- a/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.configuration; - -import java.nio.file.Path; -import org.onap.dcae.VesApplication; -import org.onap.dcae.configuration.cbs.CbsConfigResolver; -import org.onap.dcae.configuration.cbs.CbsConfigResolverFactory; - -public class ConfigLoaderFactory { - - public static ConfigLoader create(Path propertiesFile, Path dmaapConfigFile) { - ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile); - CbsConfigResolver cbsConfigResolver = new CbsConfigResolverFactory().create(); - return new ConfigLoader( - configFilesFacade, - cbsConfigResolver, - VesApplication::restartApplication); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/ConfigParsing.java b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java index 13deff73..d648332d 100644 --- a/src/main/java/org/onap/dcae/configuration/ConfigParsing.java +++ b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java @@ -47,11 +47,11 @@ interface ConfigParsing { } static Map getProperties(JSONObject configuration) { - log.info(f("Getting properties configuration from app configuration: '%s'", configuration)); + log.debug(f("Getting properties configuration from app configuration: '%s'", configuration)); Map confEntries = toList(configuration.toMap().entrySet().iterator()) .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue()))) .filterKeys(e -> !e.startsWith("streams_publishes")); - log.info(f("Found following app properties: '%s'", confEntries)); + log.debug(f("Found following app properties: '%s'", confEntries)); return confEntries; } -} \ No newline at end of file +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java new file mode 100644 index 00000000..eb1a1a5f --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigUpdater { + + private static final Logger log = LoggerFactory.getLogger(ConfigUpdater.class); + private final ConfigFilesFacade configFilesFacade; + private final Runnable applicationRestarter; + private boolean isApplicationRestartNeeded; + + public ConfigUpdater(ConfigFilesFacade configFilesFacade, Runnable applicationRestarter) { + this.configFilesFacade = configFilesFacade; + this.applicationRestarter = applicationRestarter; + this.isApplicationRestartNeeded = false; + } + + public synchronized void updateConfig(Option appConfig) { + appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage()); + } + + private Runnable logSkipMessage() { + return () -> log.info("Skipping dynamic configuration"); + } + + private void handleUpdate(JSONObject appConfig) { + updatePropertiesIfChanged(appConfig); + updateDmaapConfigIfChanged(appConfig); + restartApplicationIfNeeded(); + } + + private void updatePropertiesIfChanged(JSONObject appConfig) { + Map newProperties = ConfigParsing.getProperties(appConfig); + Map oldProperties = configFilesFacade.readCollectorProperties().get(); + + if (!areCommonPropertiesSame(oldProperties, newProperties)) { + configFilesFacade.writeProperties(newProperties); + isApplicationRestartNeeded = true; + } + } + + private boolean areCommonPropertiesSame(Map oldProperties, Map newProperties) { + Map filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties); + return filteredOldProperties.equals(newProperties); + } + + private Map filterIntersectingKeys(Map primaryProperties, + Map otherProperties) { + return primaryProperties.filterKeys(key -> containsKey(key, otherProperties)); + } + + private boolean containsKey(String key, Map properties) { + return properties.keySet().contains(key); + } + + private void updateDmaapConfigIfChanged(JSONObject appConfig) { + JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get(); + JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get(); + + if (!oldDmaapConfig.similar(newDmaapConfig)) { + configFilesFacade.writeDMaaPConfiguration(newDmaapConfig); + isApplicationRestartNeeded = true; + } + } + + private void restartApplicationIfNeeded() { + if (isApplicationRestartNeeded) { + applicationRestarter.run(); + isApplicationRestartNeeded = false; + } + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java new file mode 100644 index 00000000..866bc20b --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import java.nio.file.Path; +import org.onap.dcae.VesApplication; + +public class ConfigUpdaterFactory { + + private ConfigUpdaterFactory() { + } + + public static ConfigUpdater create(Path propertiesFile, Path dmaapConfigFile) { + ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile); + return new ConfigUpdater( + configFilesFacade, + VesApplication::restartApplication); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java new file mode 100644 index 00000000..ebdf0474 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================= + * VES Collector + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved.s + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +/** + * ConfigurationHandler is responsible for receiving configuration updates from Consul. + * Any change made in the Consul will be reported as a notification. + */ +public class ConfigurationHandler { + + private static Logger log = LoggerFactory.getLogger(ConfigurationHandler.class); + private static final String CONFIG_DICT = "config"; + + private final CbsClientConfigurationProvider cbsClientConfigurationProvider; + private final ConfigUpdater configUpdater; + + /** + * Constructor + * @param cbsClientConfigurationProvider provides configuration to connect with Consul + * @param configUpdater for updating application configuration + */ + public ConfigurationHandler(CbsClientConfigurationProvider cbsClientConfigurationProvider, ConfigUpdater configUpdater) { + this.cbsClientConfigurationProvider = cbsClientConfigurationProvider; + this.configUpdater = configUpdater; + } + + /** + * Start listen for application configuration notifications with configuration changes + * @param interval defines period of time when notification can come + * @return {@link Disposable} handler to close Consul listener at the end + */ + public Disposable startListen(Duration interval) { + + log.info("Start listening for configuration from Consul ..."); + log.info(String.format("Consul configuration will be fetched in %s period.", interval)); + + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + + final CbsRequest request = createCbsRequest(); + final CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationProvider.get(); + + return createCbsClient(cbsClientConfiguration) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)) + .subscribe( + this::handleConfigurationFromConsul, + this::handleError + ); + } + + Mono createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); + } + + void handleConfigurationFromConsul(JsonObject jsonObject) { + log.info("Configuration update from Consul {}", jsonObject); + if(jsonObject.has(CONFIG_DICT)) { + JsonObject config = jsonObject.getAsJsonObject(CONFIG_DICT); + JSONObject jObject = new JSONObject(config.toString()); + configUpdater.updateConfig(Option.of(jObject)); + } else { + throw new IllegalArgumentException(String.format("Invalid application configuration: %s ", jsonObject)); + } + } + + private void handleError(Throwable throwable) { + log.error("Unexpected error occurred during fetching configuration from Consul", throwable); + } + + private CbsRequest createCbsRequest() { + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + return CbsRequests.getAll(diagnosticContext); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java new file mode 100644 index 00000000..fc88197c --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration.cbs; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CbsClientConfigurationProvider is used to provide production or dev configuration for CBS client. + */ +public class CbsClientConfigurationProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationProvider.class); + + private static final String DEFAULT_PROTOCOL = "http"; + private static final String DEFAULT_HOSTNAME = "config-binding-service"; + private static final int DEFAULT_PORT = 10000; + private static final String DEFAULT_APP_NAME = "dcae-ves-collector"; + private static final String DEV_MODE_PROPERTY = "devMode"; + private static final String CBS_PORT_PROPERTY = "cbsPort"; + + /** + * Returns configuration for CBS client. + * @return Production or dev configuration for CBS client, depends on application run arguments. + */ + public CbsClientConfiguration get() { + try { + if (isDevModeEnabled()) { + return getDevConfiguration(); + } else { + return CbsClientConfiguration.fromEnvironment(); + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed resolving CBS client configuration from system environments: %s", e)); + } + return getFallbackConfiguration(); + } + + @NotNull + private ImmutableCbsClientConfiguration getDevConfiguration() { + return createCbsClientConfiguration( + DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, + Integer.parseInt(System.getProperty(CBS_PORT_PROPERTY, String.valueOf(DEFAULT_PORT))) + ); + } + + private boolean isDevModeEnabled() { + return System.getProperty(DEV_MODE_PROPERTY) != null; + } + + private ImmutableCbsClientConfiguration getFallbackConfiguration() { + LOGGER.info("Falling back to use default CBS client configuration"); + return createCbsClientConfiguration(DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, DEFAULT_PORT); + } + + private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname, + String appName, Integer port) { + return ImmutableCbsClientConfiguration.builder() + .protocol(protocol) + .hostname(hostname) + .port(port) + .appName(appName) + .build(); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java deleted file mode 100644 index dc02131e..00000000 --- a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java +++ /dev/null @@ -1,60 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.configuration.cbs; - -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class CbsClientConfigurationResolver { - - private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationResolver.class); - - private final String defaultProtocol = "http"; - private final String defaultHostname = "config-binding-service"; - private final int defaultPort = 10000; - private final String defaultAppName = "dcae-ves-collector"; - - CbsClientConfiguration resolveCbsClientConfiguration() { - try { - return CbsClientConfiguration.fromEnvironment(); - } catch (Exception e) { - LOGGER.warn("Failed resolving CBS client configuration from system environments: " + e); - } - LOGGER.info("Falling back to use default CBS client configuration properties"); - return getFallbackConfiguration(); - } - - private ImmutableCbsClientConfiguration getFallbackConfiguration() { - LOGGER.info("Falling back to use default CBS client configuration"); - return createCbsClientConfiguration(defaultProtocol, defaultHostname, defaultAppName, defaultPort); - } - - private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname, - String appName, Integer port) { - return ImmutableCbsClientConfiguration.builder() - .protocol(protocol) - .hostname(hostname) - .port(port) - .appName(appName) - .build(); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java deleted file mode 100644 index 09a96985..00000000 --- a/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java +++ /dev/null @@ -1,58 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.configuration.cbs; - -import com.google.gson.JsonObject; -import io.vavr.control.Option; -import org.json.JSONObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - - -public class CbsConfigResolver { - - private static final Logger log = LoggerFactory.getLogger(CbsConfigResolver.class); - - private final CbsClientConfigurationResolver cbsClientConfigurationResolver; - private final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); - private final CbsRequest cbsConfigurationRequest = CbsRequests.getConfiguration(diagnosticContext); - - CbsConfigResolver(CbsClientConfigurationResolver cbsClientConfigurationResolver) { - this.cbsClientConfigurationResolver = cbsClientConfigurationResolver; - } - - public Option getAppConfig() { - JsonObject emptyJson = new JsonObject(); - CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationResolver.resolveCbsClientConfiguration(); - JsonObject jsonObject = CbsClientFactory.createCbsClient(cbsClientConfiguration) - .flatMap(cbsClient -> cbsClient.get(cbsConfigurationRequest)) - .doOnError(error -> log.warn("Failed to fetch configuration from CBS " + error.getMessage())) - .onErrorReturn(emptyJson) - .block(); - - return emptyJson.equals(jsonObject) ? Option.none() : Option.of(new JSONObject(jsonObject.toString())); - } -} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java deleted file mode 100644 index 5e42d9ec..00000000 --- a/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2020 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.configuration.cbs; - -public class CbsConfigResolverFactory { - - public CbsConfigResolver create() { - CbsClientConfigurationResolver resolver = new CbsClientConfigurationResolver(); - return new CbsConfigResolver(resolver); - } -} \ No newline at end of file -- cgit 1.2.3-korg