From a72ce419bce29ba8cf421eb74afad730d7cf374b Mon Sep 17 00:00:00 2001 From: Mukesh Paliwal Date: Wed, 19 Jan 2022 17:43:21 +0530 Subject: RESTConf - Switch CBS client library to 1.8.7 or higher Issue-ID: DCAEGEN2-2857 Signed-off-by: Mukesh Paliwal Change-Id: I3242a29e2e09fae65dd1738b6883970ce436bf2a --- Changelog.md | 3 + pom.xml | 41 ++++-- .../dcae/ApplicationConfigurationListener.java | 108 ++++++++++++++++ .../java/org/onap/dcae/ApplicationSettings.java | 7 +- src/main/java/org/onap/dcae/RestConfCollector.java | 113 ++++++++-------- .../common/publishing/DMaaPEventPublisher.java | 12 +- .../publishing/DmaapRequestConfiguration.java | 123 ++++++++++++++++++ .../dcae/common/publishing/EventPublisher.java | 4 +- .../publishing/MessageRouterHttpStatusMapper.java | 111 ++++++++++++++++ .../org/onap/dcae/common/publishing/Publisher.java | 74 +++++++++++ .../dcae/common/publishing/PublisherConfig.java | 6 +- .../onap/dcae/configuration/ConfigFilesFacade.java | 142 +++++++++++++++++++++ .../org/onap/dcae/configuration/ConfigParsing.java | 56 ++++++++ .../org/onap/dcae/configuration/ConfigUpdater.java | 118 +++++++++++++++++ .../dcae/configuration/ConfigUpdaterFactory.java | 45 +++++++ .../dcae/configuration/ConfigurationHandler.java | 114 +++++++++++++++++ .../org/onap/dcae/configuration/Conversions.java | 47 +++++++ .../cbs/CbsClientConfigurationProvider.java | 85 ++++++++++++ .../java/org/onap/dcae/restapi/ApiException.java | 26 +++- .../dcae/ApplicationConfigurationListenerTest.java | 67 ++++++++++ .../common/publishing/DMaaPEventPublisherTest.java | 83 ------------ version.properties | 2 +- 22 files changed, 1224 insertions(+), 163 deletions(-) create mode 100755 src/main/java/org/onap/dcae/ApplicationConfigurationListener.java create mode 100755 src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java create mode 100755 src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java create mode 100755 src/main/java/org/onap/dcae/common/publishing/Publisher.java create mode 100755 src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java create mode 100755 src/main/java/org/onap/dcae/configuration/ConfigParsing.java create mode 100755 src/main/java/org/onap/dcae/configuration/ConfigUpdater.java create mode 100755 src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java create mode 100755 src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java create mode 100755 src/main/java/org/onap/dcae/configuration/Conversions.java create mode 100755 src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java create mode 100755 src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java delete mode 100644 src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java diff --git a/Changelog.md b/Changelog.md index d17a970..8c8d5c7 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.3.1] - 2022/01/20 + - [DCAEGEN2-2857] - RESTConf - Switch CBS client library to 1.8.7 or higher + ## [1.3.0] - 2022/01/18 - [DCAEGEN2-2962] - Switch RESTCONF Collector to Integration base image - [DCAEGEN2-3022] - Remediation for Log4Shell vulnerability (upgrade to 2.17.1) diff --git a/pom.xml b/pom.xml index fce456e..d0ba10b 100644 --- a/pom.xml +++ b/pom.xml @@ -23,11 +23,11 @@ limitations under the License. org.onap.oparent oparent 2.0.0 - + org.onap.dcaegen2.collectors.restconf restconfcollector - 1.3.0-SNAPSHOT + 1.3.1-SNAPSHOT dcaegen2-collectors-restconf RestConfCollector @@ -48,9 +48,7 @@ limitations under the License. content/repositories/releases/ content/sites/site/org/onap/dcaegen2/collectors/restconf/${project.artifactId}/${project.version} yyyyMMdd'T'HHmmss - - ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml - + ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml @@ -199,9 +197,9 @@ limitations under the License. ${onap.nexus.dockerregistry.daily} ${onap.nexus.dockerregistry.daily}/${docker.image.name} - ${project.version}-${maven.build.timestamp}Z - ${project.version} - latest + ${project.version}-${maven.build.timestamp}Z + ${project.version} + latest ${project.basedir}/src/main/docker @@ -314,13 +312,18 @@ limitations under the License. com.google.code.gson gson - 2.8.5 + 2.8.6 org.json json 20160810 + + com.google.guava + guava + 30.1-jre + com.att.nsa @@ -401,7 +404,7 @@ limitations under the License. 2.10.5 compile - + org.glassfish.jersey.core jersey-client 2.27 @@ -505,6 +508,22 @@ limitations under the License. crypt-password 1.3.1 + + org.onap.dcaegen2.services.sdk.rest.services + cbs-client + 1.8.7 + + + org.onap.dcaegen2.services.sdk.rest.services + dmaap-client + 1.8.7 + + + ch.qos.logback + logback-classic + + + @@ -525,4 +544,4 @@ limitations under the License. http://maven.restlet.com - + \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java new file mode 100755 index 0000000..5975df5 --- /dev/null +++ b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcae; + +import org.onap.dcae.configuration.ConfigurationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +import java.time.Duration; + +/** + * ApplicationConfigurationListener is used to listen at notifications with configuration updates. + */ +public class ApplicationConfigurationListener implements Runnable { + + private static Logger log = LoggerFactory.getLogger(ApplicationConfigurationListener.class); + + private Duration interval; + private boolean terminate = false; + private final ConfigurationHandler configurationHandler; + + /** + * Constructor + * @param interval defines period of time when notification can come + * @param configurationHandler handles notifications + */ + public ApplicationConfigurationListener(Duration interval, ConfigurationHandler configurationHandler) { + this.interval = interval; + this.configurationHandler = configurationHandler; + } + + /** + * Reload listener to start listening for configurations notifications with defined interval. + * @param interval defines period of time when notification can come + */ + public synchronized void reload(Duration interval) { + this.interval = interval; + log.info("Handler configuration was changed. Need to reload configuration handler."); + sendReloadAction(); + } + + /** + * Send reload action listener to notifications all. + */ + synchronized void sendReloadAction() { + this.notifyAll(); + } + + /** + * Start listening for configurations notification. + */ + @Override + public void run() { + Disposable configListener = null; + do { + try { + configListener = listenForConfigurationUpdates(); + synchronized (this) { + log.info("Switch to configuration handler thread. Active waiting for configuration."); + this.wait(); + } + } catch (Exception e) { + log.error("Unexpected error occurred during handling data.", e); + terminate(); + } finally { + stopListeningForConfigurationUpdates(configListener); + } + } while (!this.terminate); + } + + private Disposable listenForConfigurationUpdates() { + return this.configurationHandler.startListen(this.interval); + } + + void terminate() { + this.terminate = true; + } + + /** + * Release resources when there is a need to stop listener + * @param consulListener Handler to configurations listener + */ + void stopListeningForConfigurationUpdates(Disposable consulListener) { + if (consulListener != null) { + consulListener.dispose(); + } + } +} diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 9caf2f6..0dcec98 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ import java.util.Iterator; public class ApplicationSettings { private static final Logger log = LoggerFactory.getLogger(ApplicationSettings.class); - + public static String responseCompatibility; private final String appInvocationDir; private final String configurationFileLocation; private final PropertiesConfiguration properties = new PropertiesConfiguration(); @@ -175,6 +175,9 @@ public class ApplicationSettings { return prependWithUserDirOnRelative(properties.getString("collector.eventinfo", "etc/ont_config.json")); } + public int configurationUpdateFrequency() { + return properties.getInt("collector.dynamic.config.update.frequency", 5); + } public String dMaaPStreamsMapping() { return properties.getString("collector.rcc.dmaap.streamid", null); diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java index 545bfd4..ff749f3 100644 --- a/src/main/java/org/onap/dcae/RestConfCollector.java +++ b/src/main/java/org/onap/dcae/RestConfCollector.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,11 @@ import org.json.JSONArray; import org.json.JSONObject; import org.onap.dcae.common.ControllerActivationState; import org.onap.dcae.common.EventData; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; +import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.onap.dcae.common.publishing.EventPublisher; import org.onap.dcae.common.publishing.PublisherConfig; import org.onap.dcae.controller.AccessController; -import org.onap.dcae.controller.ConfigLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -39,61 +38,98 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.ConfigurableApplicationContext; +import org.onap.dcae.configuration.ConfigurationHandler; +import org.onap.dcae.configuration.ConfigUpdater; +import org.onap.dcae.configuration.ConfigUpdaterFactory; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; -import java.util.concurrent.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class RestConfCollector { private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); - private static final int MAX_THREADS = 20; + private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5; private static Logger log = LoggerFactory.getLogger(RestConfCollector.class); public static LinkedBlockingQueue fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; - private static ConfigLoader configLoader; private static SpringApplication app; - private static ScheduledFuture scheduleFeatures; - private static ScheduledFuture scheduleCtrlActivation; - private static ExecutorService executor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static ScheduledThreadPoolExecutor scheduledThExController; private static EventPublisher eventPublisher; - private static EventProcessor eventProcessor; + private static ConfigUpdater configUpdater; + private static ApplicationConfigurationListener applicationConfigurationListener; + private static ReentrantLock applicationLock = new ReentrantLock(); /* List of Controllers */ private static java.util.Map controllerStore = new ConcurrentHashMap<>(); public static void main(String[] args) { + applicationLock.lock(); + try { + startApplication(args); + startListeningForApplicationConfigurationStoredInConsul(); + } finally { + applicationLock.unlock(); + } + } + + private static void startApplication(String[] args) { oplog.info("RestconfController starting"); app = new SpringApplication(RestConfCollector.class); properties = new ApplicationSettings(args, CLIUtils::processCmdLine); + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); scheduledThExController = new ScheduledThreadPoolExecutor(1); init(); + applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul(); app.setAddCommandLineProperties(true); context = app.run(); controllerConfig(properties); - configLoader.updateConfig(); oplog.info("RestConfController running ....."); } + private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() { + ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater); + ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler); + + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + scheduledThreadPoolExecutor.execute(applicationConfigProvider); + + return applicationConfigProvider; + } public static void restartApplication() { Thread thread = new Thread(() -> { - controllerConfigCleanup(); - context.close(); - properties.reloadProperties(); - scheduleFeatures.cancel(true); - scheduleCtrlActivation.cancel(true); - init(); - controllerConfig(properties); - context = SpringApplication.run(RestConfCollector.class); + try { + applicationLock.lock(); + controllerConfigCleanup(); + if (context != null) { + context.close(); + } + properties.reloadProperties(); + init(); + controllerConfig(properties); + context = SpringApplication.run(RestConfCollector.class); + + configUpdater.setPaths(properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + applicationConfigurationListener.reload(Duration.ofMinutes(properties.configurationUpdateFrequency())); + } finally { + applicationLock.unlock(); + } }); thread.setDaemon(false); thread.start(); @@ -102,9 +138,11 @@ public class RestConfCollector { private static void init() { fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); - createConfigLoader(); - createSchedulePoolExecutor(); - createExecutors(); + + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + eventPublisher = new DMaaPEventPublisher(getDmapConfig()); } private static Map getDmapConfig() { @@ -170,41 +208,10 @@ public class RestConfCollector { log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!"); } - private static void createConfigLoader() { - log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation()); - - configLoader = ConfigLoader.create(getEventPublisher()::reconfigure, - Paths.get(properties.dMaaPConfigurationFileLocation()), - properties.configurationFileLocation()); - } - private static EventPublisher getEventPublisher() { return EventPublisher.createPublisher(oplog, getDmapConfig()); } - private static void createSchedulePoolExecutor() { - scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, - 10, - 10, - TimeUnit.MINUTES); - ControllerActivationTask task = new ControllerActivationTask(); - scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task, - 10, - 10, - TimeUnit.SECONDS); - } - - private static void createExecutors() { - eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(eventPublisher, - parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping())); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } - } - private static class ControllerActivationTask implements Runnable { public ControllerActivationTask() { diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 1209b38..e023c57 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,16 +36,14 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ -class DMaaPEventPublisher implements EventPublisher { +public final class DMaaPEventPublisher implements EventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; - private final Logger outputLogger; + private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");; - DMaaPEventPublisher(DMaaPPublishersCache publishersCache, - Logger outputLogger) { - this.publishersCache = publishersCache; - this.outputLogger = outputLogger; + public DMaaPEventPublisher(Map publishersCache) { + this.publishersCache = new DMaaPPublishersCache(publishersCache); } @Override diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100755 index 0000000..5955a9d --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +/** + * DmaapRequestConfiguration is DMaap request configuration. + */ +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * @param timeout timeout configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + /** + * Convert JSON object list. + * @param messages list of messages. + * return flux jsonobject list of messages. + */ + static Flux jsonBatch(List messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + /** + * Retry configuration. + * return MessageRouterPublisherConfig message router publish coinfiguration. + */ + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List getAsJsonObjects(List messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List getAsJsonElements(List messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java index 91736ec..8042215 100644 --- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import org.slf4j.Logger; public interface EventPublisher { static EventPublisher createPublisher(Logger outputLogger, Map dMaaPConfig) { - return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + return new DMaaPEventPublisher(dMaaPConfig); } void sendEvent(JSONObject event, String domain); diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100755 index 0000000..4806127 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,111 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +/** + * MessageRouterHttpStatusMapper is responsible for HTTP status mapper. + */ +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + /** + * Get http status. + * @param messageRouterPublishResponse message reouter publish response. + */ + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100755 index 0000000..6f69f4f --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +/** + * Publisher is responsible publish events. + */ +public class Publisher { + + private final MessageRouterPublisher publisher; + + /** + * Constructor + */ + public Publisher() { + this(retryConfiguration()); + } + + /** + * Constructor + * @param messageRouterPublisherConfig message router publish configuration + */ + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux publishEvents(List events, Option publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux publishEvents(List events, MessageRouterPublishRequest publishRequest) { + final Flux jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 67aca1d..57210c4 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } diff --git a/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java new file mode 100755 index 0000000..0a8c089 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java @@ -0,0 +1,142 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.common.publishing.VavrUtils.logError; +import static org.onap.dcae.configuration.Conversions.toList; + +import io.vavr.CheckedRunnable; +import io.vavr.Tuple2; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigFilesFacade is used for reading and writing application properties and dmaap configuration. + */ +public class ConfigFilesFacade { + + private static final Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class); + + private Path dmaapConfigPath; + private Path propertiesPath; + + ConfigFilesFacade(Path propertiesPath, Path dMaaPConfigPath) { + this.propertiesPath = propertiesPath; + this.dmaapConfigPath = dMaaPConfigPath; + } + + /** + * Set new paths + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public void setPaths(Path propertiesFile, Path dmaapConfigFile) { + this.propertiesPath = propertiesFile; + this.dmaapConfigPath = dmaapConfigFile; + } + + Try> readCollectorProperties() { + log.info(f("Reading collector properties from path: '%s'", propertiesPath)); + return Try(this::readProperties) + .map(prop -> toList(prop.getKeys()).toMap(k -> k, k -> (String) prop.getProperty(k))) + .mapFailure(enhanceError("Unable to read properties configuration from path '%s'", propertiesPath)) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following collector properties: '%s'", props))); + } + + Try readDMaaPConfiguration() { + log.info(f("Reading DMaaP configuration from file: '%s'", dmaapConfigPath)); + return readFile(dmaapConfigPath) + .recover(FileNotFoundException.class, __ -> "{}") + .mapFailure(enhanceError("Unable to read DMaaP configuration from file '%s'", dmaapConfigPath)) + .flatMap(Conversions::toJson) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following DMaaP properties: '%s'", props))); + } + + Try writeDMaaPConfiguration(JSONObject dMaaPConfiguration) { + log.info(f("Writing DMaaP configuration '%s' into file '%s'", dMaaPConfiguration, dmaapConfigPath)); + return writeFile(dmaapConfigPath, indentConfiguration(dMaaPConfiguration.toString())) + .mapFailure(enhanceError("Could not save new DMaaP configuration to path '%s'", dmaapConfigPath)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + + Try writeProperties(Map properties) { + log.info(f("Writing properties configuration '%s' into file '%s'", properties, propertiesPath)); + return Try.run(saveProperties(properties)) + .mapFailure(enhanceError("Could not save properties to path '%s'", properties)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + private Try readFile(Path path) { + return Try(() -> new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) + .mapFailure(enhanceError("Could not read content from path: '%s'", path)); + } + + private Try writeFile(Path path, String content) { + return Try.run(() -> Files.write(path, content.getBytes())) + .mapFailure(enhanceError("Could not write content to path: '%s'", path)); + } + + private PropertiesConfiguration readProperties() throws ConfigurationException { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(); + propertiesConfiguration.setDelimiterParsingDisabled(true); + propertiesConfiguration.load(propertiesPath.toFile()); + return propertiesConfiguration; + } + + private CheckedRunnable saveProperties(Map properties) { + return () -> { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(propertiesPath.toFile()); + propertiesConfiguration.setEncoding(null); + for (Tuple2 property : properties) { + updateProperty(propertiesConfiguration, property); + } + propertiesConfiguration.save(); + }; + } + + private void updateProperty(PropertiesConfiguration propertiesConfiguration, Tuple2 property) { + if (propertiesConfiguration.containsKey(property._1)) { + propertiesConfiguration.setProperty(property._1, property._2); + } else { + propertiesConfiguration.addProperty(property._1, property._2); + } + } + + private String indentConfiguration(String configuration) { + return new JSONObject(configuration).toString(4); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigParsing.java b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java new file mode 100755 index 0000000..7b2a656 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.configuration.Conversions.toList; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +interface ConfigParsing { + + Logger log = LoggerFactory.getLogger(ConfigParsing.class); + + static Option getDMaaPConfig(JSONObject configuration) { + log.info(f("Getting DMaaP configuration from app configuration: '%s'", configuration)); + return toList(configuration.toMap().entrySet().iterator()) + .filter(t -> t.getKey().startsWith("streams_publishes")) + .headOption() + .flatMap(e -> Try(() -> configuration.getJSONObject(e.getKey())).toOption()) + .onEmpty(() -> log.warn(f("App configuration '%s' is missing DMaaP configuration ('streams_publishes' key) " + + "or DMaaP configuration is not a valid json document", configuration))) + .peek(dMaaPConf -> log.info(f("Found following DMaaP configuration: '%s'", dMaaPConf))); + } + + static Map getProperties(JSONObject configuration) { + log.debug(f("Getting properties configuration from app configuration: '%s'", configuration)); + Map confEntries = toList(configuration.toMap().entrySet().iterator()) + .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue()))) + .filterKeys(e -> !e.startsWith("streams_publishes")); + log.debug(f("Found following app properties: '%s'", confEntries)); + return confEntries; + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java new file mode 100755 index 0000000..0d9e660 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java @@ -0,0 +1,118 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; + +/** + * ConfigUpdater is responsible for receiving configuration updates from config file. + */ +public class ConfigUpdater { + + private static final Logger log = LoggerFactory.getLogger(ConfigUpdater.class); + private final ConfigFilesFacade configFilesFacade; + private final Runnable applicationRestarter; + private boolean isApplicationRestartNeeded; + + /** + * Constructor + * @param configFilesFacade provides configuration files facade + * @param applicationRestarter for restart application + */ + public ConfigUpdater(ConfigFilesFacade configFilesFacade, Runnable applicationRestarter) { + this.configFilesFacade = configFilesFacade; + this.applicationRestarter = applicationRestarter; + this.isApplicationRestartNeeded = false; + } + + /** + * Set new paths + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public void setPaths(Path propertiesFile, Path dmaapConfigFile){ + this.configFilesFacade.setPaths(propertiesFile, dmaapConfigFile); + + } + /** + * update configuration + * @param appConfig JSON object list + */ + public synchronized void updateConfig(Option appConfig) { + appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage()); + } + + private Runnable logSkipMessage() { + return () -> log.info("Skipping dynamic configuration"); + } + + private void handleUpdate(JSONObject appConfig) { + updatePropertiesIfChanged(appConfig); + updateDmaapConfigIfChanged(appConfig); + restartApplicationIfNeeded(); + } + + private void updatePropertiesIfChanged(JSONObject appConfig) { + Map newProperties = ConfigParsing.getProperties(appConfig); + Map oldProperties = configFilesFacade.readCollectorProperties().get(); + + if (!areCommonPropertiesSame(oldProperties, newProperties)) { + configFilesFacade.writeProperties(newProperties); + isApplicationRestartNeeded = true; + } + } + + private boolean areCommonPropertiesSame(Map oldProperties, Map newProperties) { + Map filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties); + return filteredOldProperties.equals(newProperties); + } + + private Map filterIntersectingKeys(Map primaryProperties, + Map otherProperties) { + return primaryProperties.filterKeys(key -> containsKey(key, otherProperties)); + } + + private boolean containsKey(String key, Map properties) { + return properties.keySet().contains(key); + } + + private void updateDmaapConfigIfChanged(JSONObject appConfig) { + JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get(); + JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get(); + + if (!oldDmaapConfig.similar(newDmaapConfig)) { + configFilesFacade.writeDMaaPConfiguration(newDmaapConfig); + isApplicationRestartNeeded = true; + } + } + + private void restartApplicationIfNeeded() { + if (isApplicationRestartNeeded) { + applicationRestarter.run(); + isApplicationRestartNeeded = false; + } + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java new file mode 100755 index 0000000..7fc4532 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import java.nio.file.Path; + +import org.onap.dcae.RestConfCollector; + +/** + * ConfigUpdaterFactory is responsible for receiving configuration from config file or Consul (if config file doesn't exist). + */ +public class ConfigUpdaterFactory { + + private ConfigUpdaterFactory() { + } + + /** + * create configuration updater based on property file and dmaap configuration files + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public static ConfigUpdater create(Path propertiesFile, Path dmaapConfigFile) { + ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile); + return new ConfigUpdater( + configFilesFacade, + RestConfCollector::restartApplication); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java new file mode 100755 index 0000000..9495308 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +/** + * ConfigurationHandler is responsible for receiving configuration updates from config file or Consul (if config file doesn't exist). + * Any change made in the configuration will be reported as a notification. + */ +public class ConfigurationHandler { + + private static Logger log = LoggerFactory.getLogger(ConfigurationHandler.class); + private static final String CONFIG_DICT = "config"; + + private final CbsClientConfigurationProvider cbsClientConfigurationProvider; + private final ConfigUpdater configUpdater; + + /** + * Constructor + * @param cbsClientConfigurationProvider provides configuration to connect with Consul + * @param configUpdater for updating application configuration + */ + public ConfigurationHandler(CbsClientConfigurationProvider cbsClientConfigurationProvider, ConfigUpdater configUpdater) { + this.cbsClientConfigurationProvider = cbsClientConfigurationProvider; + this.configUpdater = configUpdater; + } + + /** + * Start listen for application configuration notifications with configuration changes + * @param interval defines period of time when notification can come + * @return {@link Disposable} handler to close configuration listener at the end + */ + public Disposable startListen(Duration interval) { + + log.info("Start listening for configuration ..."); + log.info(String.format("Configuration will be fetched in %s period.", interval)); + + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + + final CbsRequest request = createCbsRequest(); + final CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationProvider.get(); + + return createCbsClient(cbsClientConfiguration) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)) + .subscribe( + this::handleConfiguration, + this::handleError + ); + } + + /** + * Create CBS Client configuration + * @param accept cbsClientConfiguration + * @return return cbs client + */ + Mono createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); + } + + void handleConfiguration(JsonObject jsonObject) { + log.info("Configuration update {}", jsonObject); + if(jsonObject.has(CONFIG_DICT)) { + JsonObject config = jsonObject.getAsJsonObject(CONFIG_DICT); + JSONObject jObject = new JSONObject(config.toString()); + configUpdater.updateConfig(Option.of(jObject)); + } else { + throw new IllegalArgumentException(String.format("Invalid application configuration: %s ", jsonObject)); + } + } + + private void handleError(Throwable throwable) { + log.error("Unexpected error occurred during fetching configuration", throwable); + } + + private CbsRequest createCbsRequest() { + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + return CbsRequests.getAll(diagnosticContext); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/Conversions.java b/src/main/java/org/onap/dcae/configuration/Conversions.java new file mode 100755 index 0000000..98f632c --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/Conversions.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; + +import io.vavr.API; +import io.vavr.collection.List; +import io.vavr.control.Try; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; +import org.json.JSONObject; + +/** + * Conversions is used to convert to JSON Object. + */ +interface Conversions { + + static Try toJson(String strBody) { + return API.Try(() -> new JSONObject(strBody)) + .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody)); + } + + static List toList(Iterator iterator) { + return List + .ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java new file mode 100755 index 0000000..b808004 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration.cbs; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CbsClientConfigurationProvider is used to provide production or dev configuration for CBS client. + */ +public class CbsClientConfigurationProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationProvider.class); + + private static final String DEFAULT_PROTOCOL = "http"; + private static final String DEFAULT_HOSTNAME = "config-binding-service"; + private static final int DEFAULT_PORT = 10000; + private static final String DEFAULT_APP_NAME = "dcae-ves-collector"; + private static final String DEV_MODE_PROPERTY = "devMode"; + private static final String CBS_PORT_PROPERTY = "cbsPort"; + + /** + * Returns configuration for CBS client. + * @return Production or dev configuration for CBS client, depends on application run arguments. + */ + public CbsClientConfiguration get() { + try { + if (isDevModeEnabled()) { + return getDevConfiguration(); + } else { + return CbsClientConfiguration.fromEnvironment(); + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed resolving CBS client configuration from system environments: %s", e)); + } + return getFallbackConfiguration(); + } + + @NotNull + private ImmutableCbsClientConfiguration getDevConfiguration() { + return createCbsClientConfiguration( + DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, + Integer.parseInt(System.getProperty(CBS_PORT_PROPERTY, String.valueOf(DEFAULT_PORT))) + ); + } + + private boolean isDevModeEnabled() { + return System.getProperty(DEV_MODE_PROPERTY) != null; + } + + private ImmutableCbsClientConfiguration getFallbackConfiguration() { + LOGGER.info("Falling back to use default CBS client configuration"); + return createCbsClientConfiguration(DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, DEFAULT_PORT); + } + + private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname, + String appName, Integer port) { + return ImmutableCbsClientConfiguration.builder() + .protocol(protocol) + .hostname(hostname) + .port(port) + .appName(appName) + .build(); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java index d8c21b1..426ff87 100644 --- a/src/main/java/org/onap/dcae/restapi/ApiException.java +++ b/src/main/java/org/onap/dcae/restapi/ApiException.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,21 +23,38 @@ package org.onap.dcae.restapi; import com.google.common.base.CaseFormat; import org.json.JSONObject; +import java.util.ArrayList; +import java.util.List; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public enum ApiException { - UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401); + + UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401), + NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404), + REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408), + TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429), + INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500), + BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502), + SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503), + GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504); public final int httpStatusCode; private final ExceptionType type; private final String code; private final String details; + private final List variables; ApiException(ExceptionType type, String code, String details, int httpStatusCode) { + this(type, code, details, new ArrayList<>(), httpStatusCode); + } + + ApiException(ExceptionType type, String code, String details, List variables, int httpStatusCode) { this.type = type; this.code = code; this.details = details; + this.variables = variables; this.httpStatusCode = httpStatusCode; } @@ -45,6 +62,9 @@ public enum ApiException { JSONObject exceptionTypeNode = new JSONObject(); exceptionTypeNode.put("messageId", code); exceptionTypeNode.put("text", details); + if(!variables.isEmpty()) { + exceptionTypeNode.put("variables", variables); + } JSONObject requestErrorNode = new JSONObject(); requestErrorNode.put(type.toString(), exceptionTypeNode); @@ -55,7 +75,7 @@ public enum ApiException { } public enum ExceptionType { - POLICY_EXCEPTION; + SERVICE_EXCEPTION, POLICY_EXCEPTION; @Override public String toString() { diff --git a/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java new file mode 100755 index 0000000..fca296b --- /dev/null +++ b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.dcae.configuration.ConfigurationHandler; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; + +@RunWith(MockitoJUnitRunner.class) +public class ApplicationConfigurationListenerTest { + + @Mock + private ConfigurationHandler configurationHandler; + + @InjectMocks + @Spy + private ApplicationConfigurationListener applicationConfigurationListener; + + @Test + public void shouldStopJobAndCloseConnectionWhenErrorOccurredDuringListenAtConfigChange() { + + // given + Mockito.doThrow(new RuntimeException("Simulate exception")).when(configurationHandler).startListen(any()); + + // when + applicationConfigurationListener.run(); + + // then + Mockito.verify(applicationConfigurationListener).stopListeningForConfigurationUpdates(any()); + } + + @Test + public void shouldSendReloadAction() { + + // when + applicationConfigurationListener.reload(Duration.ofMillis(1)); + + // then + Mockito.verify(applicationConfigurationListener).sendReloadAction(); + } +} diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java deleted file mode 100644 index 79ac03b..0000000 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.restconfcollector - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common.publishing; - -import static io.vavr.API.Option; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import org.json.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; - -public class DMaaPEventPublisherTest { - - private static final String STREAM_ID = "sampleStreamId"; - - private DMaaPEventPublisher eventPublisher; - private CambriaBatchingPublisher cambriaPublisher; - private DMaaPPublishersCache dmaapPublishersCache; - - /** - * Setup before test. - */ - @Before - public void setUp() { - cambriaPublisher = mock(CambriaBatchingPublisher.class); - dmaapPublishersCache = mock(DMaaPPublishersCache.class); - when(dmaapPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); - eventPublisher = new DMaaPEventPublisher(dmaapPublishersCache, mock(Logger.class)); - } - - @Test - public void shouldSendEventToTopic() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(cambriaPublisher).send("MyPartitionKey", event.toString()); - } - - - - @Test - public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(dmaapPublishersCache).closePublisherFor(STREAM_ID); - } -} \ No newline at end of file diff --git a/version.properties b/version.properties index 7d6815b..fee4928 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=3 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg