diff options
author | Pawel <pawel.kasperkiewicz@nokia.com> | 2020-07-30 16:30:39 +0200 |
---|---|---|
committer | Pawel <pawel.kasperkiewicz@nokia.com> | 2020-08-26 08:42:44 +0200 |
commit | 50592c30fed4908bbf860f9018cc5f94f8bd2303 (patch) | |
tree | 151aefb0b21e3fbabc89c39a5731643bbbdd4edb /src/main/java | |
parent | 79efedef7af08038b49821c410db1b11e038f653 (diff) |
Config fetch for VESCollector through DCAE-SDK (CBS Client)
Issue-ID: DCAEGEN2-2212
Signed-off-by: Pawel <pawel.kasperkiewicz@nokia.com>
Change-Id: I25072b340b5c9f2f538d39e5befb1331804b7bba
Signed-off-by: Pawel <pawel.kasperkiewicz@nokia.com>
Diffstat (limited to 'src/main/java')
15 files changed, 340 insertions, 516 deletions
diff --git a/src/main/java/org/onap/dcae/VesApplication.java b/src/main/java/org/onap/dcae/VesApplication.java index bb785dbd..e5ee6e35 100644 --- a/src/main/java/org/onap/dcae/VesApplication.java +++ b/src/main/java/org/onap/dcae/VesApplication.java @@ -22,16 +22,13 @@ package org.onap.dcae; import io.vavr.collection.Map; -import java.nio.file.Paths; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.onap.dcae.common.EventSender; import org.onap.dcae.common.validator.StndDefinedValidatorResolver; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; -import org.onap.dcae.common.publishing.EventPublisher; +import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.onap.dcae.common.publishing.PublisherConfig; -import org.onap.dcae.controller.ConfigLoader; +import org.onap.dcae.configuration.ConfigLoader; +import org.onap.dcae.configuration.ConfigLoaderFactory; import org.onap.dcaegen2.services.sdk.services.external.schema.manager.service.StndDefinedValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,74 +41,71 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; +import java.nio.file.Paths; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class VesApplication { private static final Logger incomingRequestsLogger = LoggerFactory.getLogger("org.onap.dcae.common.input"); - private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); private static final Logger errorLog = LoggerFactory.getLogger("org.onap.dcae.common.error"); private static ApplicationSettings applicationSettings; private static ConfigurableApplicationContext context; private static ConfigLoader configLoader; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; - private static SpringApplication app; - private static EventPublisher eventPublisher; + private static DMaaPEventPublisher eventPublisher; private static ScheduledFuture<?> scheduleFeatures; public static void main(String[] args) { - app = new SpringApplication(VesApplication.class); - applicationSettings = new ApplicationSettings(args, CLIUtils::processCmdLine); - scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); - init(); - app.setAddCommandLineProperties(true); - context = app.run(); - configLoader.updateConfig(); + SpringApplication app = new SpringApplication(VesApplication.class); + applicationSettings = new ApplicationSettings(args, CLIUtils::processCmdLine); + scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + init(); + app.setAddCommandLineProperties(true); + context = app.run(); + configLoader.updateConfig(); } public static void restartApplication() { - Thread thread = new Thread(() -> { - context.close(); - applicationSettings.reloadProperties(); - scheduleFeatures.cancel(true); - init(); - context = SpringApplication.run(VesApplication.class); - }); - thread.setDaemon(false); - thread.start(); + Thread thread = new Thread(() -> { + context.close(); + applicationSettings.reloadProperties(); + scheduleFeatures.cancel(true); + init(); + context = SpringApplication.run(VesApplication.class); + }); + thread.setDaemon(false); + thread.start(); } private static void init() { - createConfigLoader(); - createSchedulePoolExecutor(); - createExecutors(); + createConfigLoader(); + createSchedulePoolExecutor(); + createExecutors(); } private static void createExecutors() { - eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); + eventPublisher = new DMaaPEventPublisher(getDmaapConfig()); } private static void createSchedulePoolExecutor() { - scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, - applicationSettings.configurationUpdateFrequency(), - applicationSettings.configurationUpdateFrequency(), - TimeUnit.MINUTES); + scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, + applicationSettings.configurationUpdateFrequency(), + applicationSettings.configurationUpdateFrequency(), + TimeUnit.MINUTES); } private static void createConfigLoader() { - configLoader = ConfigLoader.create(getEventPublisher()::reconfigure, - Paths.get(applicationSettings.dMaaPConfigurationFileLocation()), - applicationSettings.configurationFileLocation()); - } - - - private static EventPublisher getEventPublisher() { - return EventPublisher.createPublisher(oplog, DMaaPConfigurationParser - .parseToDomainMapping(Paths.get(applicationSettings.dMaaPConfigurationFileLocation())).get()); + configLoader = ConfigLoaderFactory.create( + applicationSettings.configurationFileLocation(), + Paths.get(applicationSettings.dMaaPConfigurationFileLocation())); } - private static Map<String, PublisherConfig> getDmapConfig() { - return DMaaPConfigurationParser - .parseToDomainMapping(Paths.get(applicationSettings.dMaaPConfigurationFileLocation())).get(); + private static Map<String, PublisherConfig> getDmaapConfig() { + return DMaaPConfigurationParser + .parseToDomainMapping(Paths.get(applicationSettings.dMaaPConfigurationFileLocation())).get(); } @Bean diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 63be9106..1ec918c9 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -3,7 +3,7 @@ * VES Collector * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2020 Nokia. All rights reserved. + * Copyright (C) 2018,2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import org.onap.dcae.common.model.VesEvent; -import org.onap.dcae.common.publishing.EventPublisher; +import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +35,10 @@ public class EventSender { private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics"); private Map<String, String[]> streamIdToDmaapIds; - private EventPublisher eventPublisher; + private DMaaPEventPublisher eventPublisher; private static final Logger log = LoggerFactory.getLogger(EventSender.class); - public EventSender(EventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) { + public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) { this.eventPublisher = eventPublisher; this.streamIdToDmaapIds = streamIdToDmaapIds; } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 3fc9e254..4e9aabc7 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ -class DMaaPEventPublisher implements EventPublisher { +public class DMaaPEventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final String EVENT = "event"; @@ -47,15 +47,16 @@ class DMaaPEventPublisher implements EventPublisher { private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; - private final Logger outputLogger; + private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); - DMaaPEventPublisher(DMaaPPublishersCache publishersCache, - Logger outputLogger) { + DMaaPEventPublisher(DMaaPPublishersCache publishersCache) { this.publishersCache = publishersCache; - this.outputLogger = outputLogger; } - @Override + public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) { + this(new DMaaPPublishersCache(dMaaPConfig)); + } + public void sendEvent(JSONObject event, String domain) { clearVesUniqueIdFromEvent(event); publishersCache.getPublisher(domain) @@ -64,11 +65,6 @@ class DMaaPEventPublisher implements EventPublisher { .forEach(publisher -> sendEvent(event, domain, publisher)); } - @Override - public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) { - publishersCache.reconfigure(dMaaPConfig); - } - private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { Try.run(() -> uncheckedSendEvent(event, domain, publisher)) .onFailure(exc -> closePublisher(event, domain, exc)); diff --git a/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java index 0b2c197d..c0280c75 100644 --- a/src/main/java/org/onap/dcae/controller/ConfigFilesFacade.java +++ b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,37 +18,38 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.controller; +package org.onap.dcae.configuration; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.common.publishing.VavrUtils.logError; +import static org.onap.dcae.configuration.Conversions.toList; import io.vavr.CheckedRunnable; import io.vavr.Tuple2; import io.vavr.collection.Map; import io.vavr.control.Try; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; - -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.*; -import static org.onap.dcae.controller.Conversions.toList; - class ConfigFilesFacade { - private static Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class); + private static final Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class); private final Path dMaaPConfigPath; private final Path propertiesPath; - public ConfigFilesFacade(Path dMaaPConfigPath, Path propertiesPath) { - this.dMaaPConfigPath = dMaaPConfigPath; + ConfigFilesFacade(Path propertiesPath, Path dMaaPConfigPath) { this.propertiesPath = propertiesPath; + this.dMaaPConfigPath = dMaaPConfigPath; } Try<Map<String, String>> readCollectorProperties() { @@ -126,5 +127,4 @@ class ConfigFilesFacade { private String indentConfiguration(String configuration) { return new JSONObject(configuration).toString(4); } - -} +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/ConfigLoader.java b/src/main/java/org/onap/dcae/configuration/ConfigLoader.java new file mode 100644 index 00000000..eaeb09e3 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigLoader.java @@ -0,0 +1,99 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.onap.dcae.configuration.cbs.CbsConfigResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigLoader { + + private static final Logger log = LoggerFactory.getLogger(ConfigLoader.class); + private final ConfigFilesFacade configFilesFacade; + private final CbsConfigResolver cbsConfigResolver; + private final Runnable applicationRestarter; + private boolean isApplicationRestartNeeded; + + ConfigLoader(ConfigFilesFacade configFilesFacade, CbsConfigResolver cbsConfigResolver, Runnable applicationRestarter) { + this.configFilesFacade = configFilesFacade; + this.cbsConfigResolver = cbsConfigResolver; + this.applicationRestarter = applicationRestarter; + this.isApplicationRestartNeeded = false; + } + + public synchronized void updateConfig() { + Option<JSONObject> appConfig = cbsConfigResolver.getAppConfig(); + appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage()); + } + + private Runnable logSkipMessage() { + return () -> log.info("Skipping dynamic configuration"); + } + + private void handleUpdate(JSONObject appConfig) { + updatePropertiesIfChanged(appConfig); + updateDmaapConfigIfChanged(appConfig); + restartApplicationIfNeeded(); + } + + private void updatePropertiesIfChanged(JSONObject appConfig) { + Map<String, String> newProperties = ConfigParsing.getProperties(appConfig); + Map<String, String> oldProperties = configFilesFacade.readCollectorProperties().get(); + + if (!areCommonPropertiesSame(oldProperties, newProperties)) { + configFilesFacade.writeProperties(newProperties); + isApplicationRestartNeeded = true; + } + } + + private boolean areCommonPropertiesSame(Map<String, String> oldProperties, Map<String, String> newProperties) { + Map<String, String> filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties); + return filteredOldProperties.equals(newProperties); + } + + private Map<String, String> filterIntersectingKeys(Map<String, String> primaryProperties, + Map<String, String> otherProperties) { + return primaryProperties.filterKeys(key -> containsKey(key, otherProperties)); + } + + private boolean containsKey(String key, Map<String, String> properties) { + return properties.keySet().contains(key); + } + + private void updateDmaapConfigIfChanged(JSONObject appConfig) { + JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get(); + JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get(); + + if (!oldDmaapConfig.similar(newDmaapConfig)) { + configFilesFacade.writeDMaaPConfiguration(newDmaapConfig); + isApplicationRestartNeeded = true; + } + } + + private void restartApplicationIfNeeded() { + if (isApplicationRestartNeeded) { + applicationRestarter.run(); + isApplicationRestartNeeded = false; + } + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java new file mode 100644 index 00000000..e5ea3c59 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigLoaderFactory.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import java.nio.file.Path; +import org.onap.dcae.VesApplication; +import org.onap.dcae.configuration.cbs.CbsConfigResolver; +import org.onap.dcae.configuration.cbs.CbsConfigResolverFactory; + +public class ConfigLoaderFactory { + + public static ConfigLoader create(Path propertiesFile, Path dmaapConfigFile) { + ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile); + CbsConfigResolver cbsConfigResolver = new CbsConfigResolverFactory().create(); + return new ConfigLoader( + configFilesFacade, + cbsConfigResolver, + VesApplication::restartApplication); + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/controller/ConfigParsing.java b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java index e1644222..13deff73 100644 --- a/src/main/java/org/onap/dcae/controller/ConfigParsing.java +++ b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.controller; +package org.onap.dcae.configuration; import static io.vavr.API.Try; import static io.vavr.API.Tuple; import static org.onap.dcae.common.publishing.VavrUtils.f; -import static org.onap.dcae.controller.Conversions.toList; +import static org.onap.dcae.configuration.Conversions.toList; import io.vavr.collection.Map; import io.vavr.control.Option; @@ -54,5 +54,4 @@ interface ConfigParsing { log.info(f("Found following app properties: '%s'", confEntries)); return confEntries; } - -} +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/controller/Conversions.java b/src/main/java/org/onap/dcae/configuration/Conversions.java index e8f7cc0c..85c3f78d 100644 --- a/src/main/java/org/onap/dcae/controller/Conversions.java +++ b/src/main/java/org/onap/dcae/configuration/Conversions.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2018,2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.controller; +package org.onap.dcae.configuration; import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.Spliterator; import java.util.Spliterators; import java.util.stream.StreamSupport; -import org.json.JSONArray; import org.json.JSONObject; /** @@ -41,13 +40,8 @@ interface Conversions { .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody)); } - static Try<JSONArray> toJsonArray(String strBody) { - return API.Try(() -> new JSONArray(strBody)) - .mapFailure(enhanceError("Value '%s' is not a valid JSON array", strBody)); - } - static <T> List<T> toList(Iterator<T> iterator) { - return List.ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)); + return List + .ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)); } - -} +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java new file mode 100644 index 00000000..360e0a8f --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationResolver.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration.cbs; + +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CbsClientConfigurationResolver { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationResolver.class); + + private final String defaultProtocol = "http"; + private final String defaultHostname = "config-binding-service"; + private final int defaultPort = 10000; + private final String defaultAppName = "dcae-ves-collector"; + + CbsClientConfiguration resolveCbsClientConfiguration() { + try { + return CbsClientConfiguration.fromEnvironment(); + } catch (Exception e) { + LOGGER.warn("Failed resolving CBS client configuration from system environments: " + e); + } + LOGGER.info("Falling back to use default CBS client configuration properties"); + return getFallbackConfiguration(); + } + + private ImmutableCbsClientConfiguration getFallbackConfiguration() { + LOGGER.info("Falling back to use default CBS client configuration"); + return createCbsClientConfiguration(defaultProtocol, defaultHostname, defaultAppName, defaultPort); + } + + private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname, + String appName, Integer port) { + return ImmutableCbsClientConfiguration.builder() + .protocol(protocol) + .hostname(hostname) + .port(port) + .appName(appName) + .build(); + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java new file mode 100644 index 00000000..5a66c294 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolver.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration.cbs; + +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import java.util.function.Consumer; +import org.json.JSONObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +public class CbsConfigResolver { + + private static final Logger log = LoggerFactory.getLogger(CbsConfigResolver.class); + + private final CbsClientConfiguration cbsClientConfiguration; + private final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + private final CbsRequest cbsConfigurationRequest = CbsRequests.getConfiguration(diagnosticContext); + + CbsConfigResolver(CbsClientConfiguration cbsClientConfiguration) { + this.cbsClientConfiguration = cbsClientConfiguration; + } + + public Option<JSONObject> getAppConfig() { + JsonObject emptyJson = new JsonObject(); + JsonObject jsonObject = CbsClientFactory.createCbsClient(cbsClientConfiguration) + .flatMap(cbsClient -> cbsClient.get(cbsConfigurationRequest)) + .doOnError(error -> log.warn("Failed to fetch configuration from CBS " + error.getMessage())) + .onErrorReturn(emptyJson) + .block(); + + return emptyJson.equals(jsonObject) ? Option.none() : Option.of(new JSONObject(jsonObject.toString())); + } +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java index 42e721a8..aa3ee00f 100644 --- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsConfigResolverFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * org.onap.dcaegen2.collectors.ves * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,22 +17,15 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.common.publishing; +package org.onap.dcae.configuration.cbs; -import io.vavr.collection.Map; -import org.json.JSONObject; -import org.slf4j.Logger; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -public interface EventPublisher { +public class CbsConfigResolverFactory { - static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) { - return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + public CbsConfigResolver create() { + CbsClientConfigurationResolver resolver = new CbsClientConfigurationResolver(); + CbsClientConfiguration cbsClientConfiguration = resolver.resolveCbsClientConfiguration(); + return new CbsConfigResolver(cbsClientConfiguration); } - - void sendEvent(JSONObject event, String domain); - - void reconfigure(Map<String, PublisherConfig> dMaaPConfig); -} +}
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/controller/ConfigLoader.java b/src/main/java/org/onap/dcae/controller/ConfigLoader.java deleted file mode 100644 index dbf52823..00000000 --- a/src/main/java/org/onap/dcae/controller/ConfigLoader.java +++ /dev/null @@ -1,144 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.controller; - -import static org.onap.dcae.common.publishing.DMaaPConfigurationParser.parseToDomainMapping; -import static org.onap.dcae.controller.ConfigParsing.getDMaaPConfig; -import static org.onap.dcae.controller.ConfigParsing.getProperties; -import static org.onap.dcae.controller.EnvPropertiesReader.readEnvProps; - -import io.vavr.Function0; -import io.vavr.Function1; -import io.vavr.collection.HashMap; -import io.vavr.collection.Map; -import io.vavr.control.Try; -import java.nio.file.Path; -import java.util.function.Consumer; -import org.json.JSONObject; -import org.onap.dcae.VesApplication; -import org.onap.dcae.common.publishing.PublisherConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConfigLoader { - - private static final String SKIP_MSG = "Skipping dynamic configuration update"; - private static Logger log = LoggerFactory.getLogger(ConfigLoader.class); - private final Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer; - private final ConfigFilesFacade configFilesFacade; - private final Function1<EnvProps, Try<JSONObject>> configurationSource; - private final Function0<Map<String, String>> envVariablesSupplier; - private boolean toRestart = false; - - ConfigLoader(Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer, - ConfigFilesFacade configFilesFacade, - Function1<EnvProps, Try<JSONObject>> configurationSource, - Function0<Map<String, String>> envVariablesSupplier) { - this.eventPublisherReconfigurer = eventPublisherReconfigurer; - this.configFilesFacade = configFilesFacade; - this.configurationSource = configurationSource; - this.envVariablesSupplier = envVariablesSupplier; - } - - public static ConfigLoader create( - Consumer<Map<String, PublisherConfig>> eventPublisherReconfigurer, - Path dMaaPConfigFile, Path propertiesConfigFile) { - return new ConfigLoader(eventPublisherReconfigurer, - new ConfigFilesFacade(dMaaPConfigFile, propertiesConfigFile), - ConfigSource::getAppConfig, - () -> HashMap.ofAll(System.getenv())); - } - - public void updateConfig() { - log.info("Trying to dynamically update config from Config Binding Service"); - readEnvProps(envVariablesSupplier.get()) - .onEmpty(() -> log.warn(SKIP_MSG)).forEach(this::updateConfig); - } - - private void updateConfig(EnvProps props) { - configurationSource.apply(props) - .onFailure(logSkip()) - .onSuccess(newConf -> { - updateConfigurationProperties(newConf); - updateDMaaPProperties(newConf); - reloadApplication(); - } - ); - } - - private void reloadApplication() { - if(toRestart){ - log.info("New app config - Application will be restarted"); - VesApplication.restartApplication(); - } - } - - private void updateDMaaPProperties(JSONObject newConf) { - configFilesFacade.readDMaaPConfiguration() - .onFailure(logSkip()) - .onSuccess(oldDMaaPConf -> getDMaaPConfig(newConf) - .onEmpty(() -> log.warn(SKIP_MSG)) - .forEach(newDMaaPConf -> compareAndOverwriteDMaaPConfig(oldDMaaPConf, newDMaaPConf))); - } - - - private void updateConfigurationProperties(JSONObject newConf) { - configFilesFacade.readCollectorProperties() - .onFailure(logSkip()) - .onSuccess(oldProps -> compareAndOverwritePropertiesConfig(newConf, oldProps)); - } - - private void compareAndOverwritePropertiesConfig(JSONObject newConf, Map<String, String> oldProps) { - Map<String, String> newProperties = getProperties(newConf); - Map<String, String> result = oldProps.filterKeys((s) -> newProperties.keySet().contains(s)); - if (!result.equals(newProperties)) { - configFilesFacade.writeProperties(newProperties) - .onSuccess(__ -> { - toRestart= true; - log.info("New properties configuration written to file"); - }) - .onFailure(logSkip()); - } else { - log.info("Collector properties from CBS are the same as currently used ones. " + SKIP_MSG); - } - } - - private void compareAndOverwriteDMaaPConfig(JSONObject oldDMaaPConf, JSONObject newDMaaPConf) { - if (!oldDMaaPConf.toString().equals(newDMaaPConf.toString())) { - parseToDomainMapping(newDMaaPConf) - .onFailure(exc -> log.error(SKIP_MSG, exc)) - .onSuccess(eventPublisherReconfigurer) - .onSuccess(parsedConfig -> - configFilesFacade.writeDMaaPConfiguration(newDMaaPConf) - .onFailure(logSkip()) - .onSuccess(__ -> { - toRestart= true; - log.info("New dMaaP configuration written to file"); - })); - } else { - log.info("DMaaP config from CBS is the same as currently used one. " + SKIP_MSG); - } - } - - private Consumer<Throwable> logSkip() { - return __ -> log.error(SKIP_MSG); - } -} diff --git a/src/main/java/org/onap/dcae/controller/ConfigSource.java b/src/main/java/org/onap/dcae/controller/ConfigSource.java deleted file mode 100644 index a9e439e4..00000000 --- a/src/main/java/org/onap/dcae/controller/ConfigSource.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.controller; - -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; -import static org.onap.dcae.common.publishing.VavrUtils.f; -import static org.onap.dcae.controller.Conversions.toJson; -import static org.onap.dcae.controller.Conversions.toJsonArray; - -import com.mashape.unirest.http.HttpResponse; -import com.mashape.unirest.http.Unirest; -import io.vavr.control.Try; -import org.json.JSONArray; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class ConfigSource { - - private static final Logger log = LoggerFactory.getLogger(ConfigSource.class); - - static Try<JSONObject> getAppConfig(EnvProps envProps) { - log.info("Fetching app configuration from CBS"); - return callConsulForCBSConfiguration(envProps) - .peek(strBody -> log.info(f("Received following CBS configuration from Consul '%s'", strBody))) - .flatMap(Conversions::toJsonArray) - .flatMap(ConfigSource::withdrawCatalog) - .flatMap(json -> constructFullCBSUrl(envProps, json)) - .flatMap(cbsUrl -> callCBSForAppConfig(envProps, cbsUrl)) - .flatMap(Conversions::toJson) - .peek(jsonNode -> log.info(f("Received app configuration: '%s'", jsonNode))) - .onFailure(exc -> log.error("Could not fetch application config", exc)); - } - - private static Try<String> callConsulForCBSConfiguration(EnvProps envProps) { - return executeGet(envProps.consulProtocol + "://" + envProps.consulHost + ":" + - envProps.consulPort + "/v1/catalog/service/" + envProps.cbsName) - .mapFailure(enhanceError("Unable to retrieve CBS configuration from Consul")); - } - - private static Try<String> constructFullCBSUrl(EnvProps envProps, JSONObject json) { - return Try(() -> envProps.cbsProtocol + "://" + json.get("ServiceAddress").toString() + ":" + - json.get("ServicePort").toString()) - .mapFailure(enhanceError("ServiceAddress / ServicePort missing from CBS conf: '%s'", json)); - } - - private static Try<JSONObject> withdrawCatalog(JSONArray json) { - return Try(() -> new JSONObject(json.get(0).toString())) - .mapFailure(enhanceError("CBS response '%s' is in invalid format," - + " most probably is it not a list of configuration objects", json)); - } - - private static Try<String> callCBSForAppConfig(EnvProps envProps, String cbsUrl) { - log.info("Calling CBS for application config"); - return executeGet(cbsUrl + "/service_component/" + envProps.appName) - .mapFailure(enhanceError("Unable to fetch configuration from CBS")); - } - - - private static Try<String> executeGet(String url) { - log.info(f("Calling HTTP GET on url: '%s'", url)); - return Try(() -> Unirest.get(url).asString()) - .mapFailure(enhanceError("Http call (GET '%s') failed.", url)) - .filter( - res -> res.getStatus() == 200, - res -> new RuntimeException(f("HTTP call (GET '%s') failed with status %s and body '%s'", - url, res.getStatus(), res.getBody()))) - .map(HttpResponse::getBody) - .peek(body -> log.info(f("HTTP GET on '%s' returned body '%s'", url, body))); - } - -} diff --git a/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java b/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java deleted file mode 100644 index 319caa65..00000000 --- a/src/main/java/org/onap/dcae/controller/EnvPropertiesReader.java +++ /dev/null @@ -1,94 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.controller; - -import io.vavr.collection.Map; -import io.vavr.control.Option; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static io.vavr.API.List; -import static io.vavr.API.Try; -import static org.onap.dcae.common.publishing.VavrUtils.f; - -final class EnvPropertiesReader { - - private final static Logger log = LoggerFactory.getLogger(EnvPropertiesReader.class); - - static Option<EnvProps> readEnvProps(Map<String, String> environmentVariables) { - log.info("Loading necessary environment variables for dynamic configuration update"); - int consulPort = getConsulPort(environmentVariables); - String consulProtocol = getConsulProtocol(environmentVariables); - String cbsProtocol = getCbsProtocol(environmentVariables); - Option<String> consulHost = getConsulHost(environmentVariables); - Option<String> cbsServiceName = getCBSName(environmentVariables); - Option<String> vesCollectorAppName = getAppName(environmentVariables); - return Option.sequence(List(consulHost, cbsServiceName, vesCollectorAppName)) - .map(e -> new EnvProps(consulProtocol, e.get(0), consulPort, cbsProtocol, e.get(1), e.get(2))) - .onEmpty(() -> log.warn("Some required environment variables are missing")) - .peek(props -> log.info(f("Discovered following environment variables: '%s'", props))); - } - - private static Option<String> getAppName(Map<String, String> environmentVariables) { - return environmentVariables.get("HOSTNAME") - .orElse(environmentVariables.get("SERVICE_NAME")) - .onEmpty(() -> log.warn("App service name (as registered in CBS) (env var: 'HOSTNAME' / 'SERVICE_NAME') " - + "is missing error environment variables.")); - } - - private static Option<String> getCBSName(Map<String, String> environmentVariables) { - return environmentVariables.get("CONFIG_BINDING_SERVICE") - .onEmpty(() -> log.warn("Name of CBS Service (as registered in Consul) (env var: 'CONFIG_BINDING_SERVICE') " - + "is missing from environment variables.")); - } - - private static Integer getConsulPort(Map<String, String> environmentVariables) { - return environmentVariables.get("CONSUL_PORT") - .flatMap(str -> Try(() -> Integer.valueOf(str)) - .onFailure(exc -> log.warn("Consul port is not an integer value", exc)) - .toOption()) - .onEmpty(() -> log.warn("Consul port (env var: 'CONSUL_PORT') is missing from environment variables. " - + "Using default value of 8500")) - .getOrElse(8500); - } - - private static Option<String> getConsulHost(Map<String, String> environmentVariables) { - return environmentVariables.get("CONSUL_HOST") - .onEmpty(() -> log.warn("Consul host (env var: 'CONSUL_HOST') (without port) " - + "is missing from environment variables.")); - } - - private static String getConsulProtocol(Map<String, String> environmentVariables) { - return getProtocolFrom("CONSUL_PROTOCOL", environmentVariables); - } - - private static String getCbsProtocol(Map<String, String> environmentVariables) { - return getProtocolFrom("CBS_PROTOCOL", environmentVariables); - } - - private static String getProtocolFrom(String variableName, Map<String, String> environmentVariables) { - return environmentVariables.get(variableName) - .onEmpty(() -> log.warn("Consul protocol (env var: '" + variableName + "') is missing " - + "from environment variables.")) - .getOrElse("http"); - } - -} diff --git a/src/main/java/org/onap/dcae/controller/EnvProps.java b/src/main/java/org/onap/dcae/controller/EnvProps.java deleted file mode 100644 index 5f7d08d5..00000000 --- a/src/main/java/org/onap/dcae/controller/EnvProps.java +++ /dev/null @@ -1,78 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.controller; - -import java.util.Objects; - -/** - * @author Pawel Szalapski (pawel.szalapski@nokia.com) - */ -final class EnvProps { - - final String consulProtocol; - final String consulHost; - final int consulPort; - final String cbsName; - final String cbsProtocol; - final String appName; - - EnvProps(String consulProtocol, String consulHost, int consulPort, String cbsProtocol, String cbsName, String appName) { - this.consulProtocol = consulProtocol; - this.consulHost = consulHost; - this.consulPort = consulPort; - this.cbsProtocol = cbsProtocol; - this.cbsName = cbsName; - this.appName = appName; - } - - @Override - public String toString() { - return "EnvProps{" + - "consulProtocol='" + consulProtocol + '\'' + - ", consulHost='" + consulHost + '\'' + - ", consulPort=" + consulPort + - ", cbsProtocol='" + cbsProtocol + '\'' + - ", cbsName='" + cbsName + '\'' + - ", appName='" + appName + '\'' + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - EnvProps envProps = (EnvProps) o; - return consulPort == envProps.consulPort && - Objects.equals(consulProtocol, envProps.consulProtocol) && - Objects.equals(consulHost, envProps.consulHost) && - Objects.equals(cbsProtocol, envProps.cbsProtocol) && - Objects.equals(cbsName, envProps.cbsName) && - Objects.equals(appName, envProps.appName); - } - - @Override - public int hashCode() { - return Objects.hash(consulProtocol, consulHost, consulPort, cbsProtocol, cbsName, appName); - } -}
\ No newline at end of file |