diff options
22 files changed, 1224 insertions, 163 deletions
diff --git a/Changelog.md b/Changelog.md index d17a970..8c8d5c7 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.3.1] - 2022/01/20 + - [DCAEGEN2-2857] - RESTConf - Switch CBS client library to 1.8.7 or higher + ## [1.3.0] - 2022/01/18 - [DCAEGEN2-2962] - Switch RESTCONF Collector to Integration base image - [DCAEGEN2-3022] - Remediation for Log4Shell vulnerability (upgrade to 2.17.1) @@ -23,11 +23,11 @@ limitations under the License. <groupId>org.onap.oparent</groupId> <artifactId>oparent</artifactId> <version>2.0.0</version> - <relativePath/> + <relativePath /> </parent> <groupId>org.onap.dcaegen2.collectors.restconf</groupId> <artifactId>restconfcollector</artifactId> - <version>1.3.0-SNAPSHOT</version> + <version>1.3.1-SNAPSHOT</version> <name>dcaegen2-collectors-restconf</name> <description>RestConfCollector</description> <properties> @@ -48,9 +48,7 @@ limitations under the License. <releases.path>content/repositories/releases/</releases.path> <site.path>content/sites/site/org/onap/dcaegen2/collectors/restconf/${project.artifactId}/${project.version}</site.path> <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format> - <sonar.coverage.jacoco.xmlReportPaths> - ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml - </sonar.coverage.jacoco.xmlReportPaths> + <sonar.coverage.jacoco.xmlReportPaths>${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths> </properties> <build> <pluginManagement> @@ -199,9 +197,9 @@ limitations under the License. <serverId>${onap.nexus.dockerregistry.daily}</serverId> <imageName>${onap.nexus.dockerregistry.daily}/${docker.image.name}</imageName> <imageTags> - <imageTag>${project.version}-${maven.build.timestamp}Z</imageTag> - <imageTag>${project.version}</imageTag> - <imageTag>latest</imageTag> + <imageTag>${project.version}-${maven.build.timestamp}Z</imageTag> + <imageTag>${project.version}</imageTag> + <imageTag>latest</imageTag> </imageTags> <dockerDirectory>${project.basedir}/src/main/docker</dockerDirectory> <resources> @@ -314,13 +312,18 @@ limitations under the License. <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>2.8.5</version> + <version>2.8.6</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>30.1-jre</version> + </dependency> <!-- REST API RELATED --> <dependency> <groupId>com.att.nsa</groupId> @@ -401,7 +404,7 @@ limitations under the License. <version>2.10.5</version> <scope>compile</scope> </dependency> - <dependency> + <dependency> <groupId>org.glassfish.jersey.core</groupId> <artifactId>jersey-client</artifactId> <version>2.27</version> @@ -505,6 +508,22 @@ limitations under the License. <artifactId>crypt-password</artifactId> <version>1.3.1</version> </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>cbs-client</artifactId> + <version>1.8.7</version> + </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>dmaap-client</artifactId> + <version>1.8.7</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <repositories> <repository> @@ -525,4 +544,4 @@ limitations under the License. <url>http://maven.restlet.com</url> </pluginRepository> </pluginRepositories> -</project> +</project>
\ No newline at end of file diff --git a/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java new file mode 100755 index 0000000..5975df5 --- /dev/null +++ b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcae; + +import org.onap.dcae.configuration.ConfigurationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; + +import java.time.Duration; + +/** + * ApplicationConfigurationListener is used to listen at notifications with configuration updates. + */ +public class ApplicationConfigurationListener implements Runnable { + + private static Logger log = LoggerFactory.getLogger(ApplicationConfigurationListener.class); + + private Duration interval; + private boolean terminate = false; + private final ConfigurationHandler configurationHandler; + + /** + * Constructor + * @param interval defines period of time when notification can come + * @param configurationHandler handles notifications + */ + public ApplicationConfigurationListener(Duration interval, ConfigurationHandler configurationHandler) { + this.interval = interval; + this.configurationHandler = configurationHandler; + } + + /** + * Reload listener to start listening for configurations notifications with defined interval. + * @param interval defines period of time when notification can come + */ + public synchronized void reload(Duration interval) { + this.interval = interval; + log.info("Handler configuration was changed. Need to reload configuration handler."); + sendReloadAction(); + } + + /** + * Send reload action listener to notifications all. + */ + synchronized void sendReloadAction() { + this.notifyAll(); + } + + /** + * Start listening for configurations notification. + */ + @Override + public void run() { + Disposable configListener = null; + do { + try { + configListener = listenForConfigurationUpdates(); + synchronized (this) { + log.info("Switch to configuration handler thread. Active waiting for configuration."); + this.wait(); + } + } catch (Exception e) { + log.error("Unexpected error occurred during handling data.", e); + terminate(); + } finally { + stopListeningForConfigurationUpdates(configListener); + } + } while (!this.terminate); + } + + private Disposable listenForConfigurationUpdates() { + return this.configurationHandler.startListen(this.interval); + } + + void terminate() { + this.terminate = true; + } + + /** + * Release resources when there is a need to stop listener + * @param consulListener Handler to configurations listener + */ + void stopListeningForConfigurationUpdates(Disposable consulListener) { + if (consulListener != null) { + consulListener.dispose(); + } + } +} diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java index 9caf2f6..0dcec98 100644 --- a/src/main/java/org/onap/dcae/ApplicationSettings.java +++ b/src/main/java/org/onap/dcae/ApplicationSettings.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ import java.util.Iterator; public class ApplicationSettings { private static final Logger log = LoggerFactory.getLogger(ApplicationSettings.class); - + public static String responseCompatibility; private final String appInvocationDir; private final String configurationFileLocation; private final PropertiesConfiguration properties = new PropertiesConfiguration(); @@ -175,6 +175,9 @@ public class ApplicationSettings { return prependWithUserDirOnRelative(properties.getString("collector.eventinfo", "etc/ont_config.json")); } + public int configurationUpdateFrequency() { + return properties.getInt("collector.dynamic.config.update.frequency", 5); + } public String dMaaPStreamsMapping() { return properties.getString("collector.rcc.dmaap.streamid", null); diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java index 545bfd4..ff749f3 100644 --- a/src/main/java/org/onap/dcae/RestConfCollector.java +++ b/src/main/java/org/onap/dcae/RestConfCollector.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,11 @@ import org.json.JSONArray; import org.json.JSONObject; import org.onap.dcae.common.ControllerActivationState; import org.onap.dcae.common.EventData; -import org.onap.dcae.common.EventProcessor; import org.onap.dcae.common.publishing.DMaaPConfigurationParser; +import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.onap.dcae.common.publishing.EventPublisher; import org.onap.dcae.common.publishing.PublisherConfig; import org.onap.dcae.controller.AccessController; -import org.onap.dcae.controller.ConfigLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -39,61 +38,98 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.ConfigurableApplicationContext; +import org.onap.dcae.configuration.ConfigurationHandler; +import org.onap.dcae.configuration.ConfigUpdater; +import org.onap.dcae.configuration.ConfigUpdaterFactory; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; -import java.util.concurrent.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class}) public class RestConfCollector { private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output"); - private static final int MAX_THREADS = 20; + private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5; private static Logger log = LoggerFactory.getLogger(RestConfCollector.class); public static LinkedBlockingQueue<EventData> fProcessingInputQueue; private static ApplicationSettings properties; private static ConfigurableApplicationContext context; - private static ConfigLoader configLoader; private static SpringApplication app; - private static ScheduledFuture<?> scheduleFeatures; - private static ScheduledFuture<?> scheduleCtrlActivation; - private static ExecutorService executor; private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static ScheduledThreadPoolExecutor scheduledThExController; private static EventPublisher eventPublisher; - private static EventProcessor eventProcessor; + private static ConfigUpdater configUpdater; + private static ApplicationConfigurationListener applicationConfigurationListener; + private static ReentrantLock applicationLock = new ReentrantLock(); /* List of Controllers */ private static java.util.Map<String, AccessController> controllerStore = new ConcurrentHashMap<>(); public static void main(String[] args) { + applicationLock.lock(); + try { + startApplication(args); + startListeningForApplicationConfigurationStoredInConsul(); + } finally { + applicationLock.unlock(); + } + } + + private static void startApplication(String[] args) { oplog.info("RestconfController starting"); app = new SpringApplication(RestConfCollector.class); properties = new ApplicationSettings(args, CLIUtils::processCmdLine); + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); scheduledThExController = new ScheduledThreadPoolExecutor(1); init(); + applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul(); app.setAddCommandLineProperties(true); context = app.run(); controllerConfig(properties); - configLoader.updateConfig(); oplog.info("RestConfController running ....."); } + private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() { + ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater); + ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler); + + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); + scheduledThreadPoolExecutor.execute(applicationConfigProvider); + + return applicationConfigProvider; + } public static void restartApplication() { Thread thread = new Thread(() -> { - controllerConfigCleanup(); - context.close(); - properties.reloadProperties(); - scheduleFeatures.cancel(true); - scheduleCtrlActivation.cancel(true); - init(); - controllerConfig(properties); - context = SpringApplication.run(RestConfCollector.class); + try { + applicationLock.lock(); + controllerConfigCleanup(); + if (context != null) { + context.close(); + } + properties.reloadProperties(); + init(); + controllerConfig(properties); + context = SpringApplication.run(RestConfCollector.class); + + configUpdater.setPaths(properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + applicationConfigurationListener.reload(Duration.ofMinutes(properties.configurationUpdateFrequency())); + } finally { + applicationLock.unlock(); + } }); thread.setDaemon(false); thread.start(); @@ -102,9 +138,11 @@ public class RestConfCollector { private static void init() { fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents()); - createConfigLoader(); - createSchedulePoolExecutor(); - createExecutors(); + + configUpdater = ConfigUpdaterFactory.create( + properties.configurationFileLocation(), + Paths.get(properties.dMaaPConfigurationFileLocation())); + eventPublisher = new DMaaPEventPublisher(getDmapConfig()); } private static Map<String, PublisherConfig> getDmapConfig() { @@ -170,41 +208,10 @@ public class RestConfCollector { log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!"); } - private static void createConfigLoader() { - log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation()); - - configLoader = ConfigLoader.create(getEventPublisher()::reconfigure, - Paths.get(properties.dMaaPConfigurationFileLocation()), - properties.configurationFileLocation()); - } - private static EventPublisher getEventPublisher() { return EventPublisher.createPublisher(oplog, getDmapConfig()); } - private static void createSchedulePoolExecutor() { - scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig, - 10, - 10, - TimeUnit.MINUTES); - ControllerActivationTask task = new ControllerActivationTask(); - scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task, - 10, - 10, - TimeUnit.SECONDS); - } - - private static void createExecutors() { - eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig()); - eventProcessor = new EventProcessor(eventPublisher, - parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping())); - - executor = Executors.newFixedThreadPool(MAX_THREADS); - for (int i = 0; i < MAX_THREADS; ++i) { - executor.execute(eventProcessor); - } - } - private static class ControllerActivationTask implements Runnable { public ControllerActivationTask() { diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 1209b38..e023c57 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -4,7 +4,7 @@ * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,16 +36,14 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ -class DMaaPEventPublisher implements EventPublisher { +public final class DMaaPEventPublisher implements EventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; - private final Logger outputLogger; + private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");; - DMaaPEventPublisher(DMaaPPublishersCache publishersCache, - Logger outputLogger) { - this.publishersCache = publishersCache; - this.outputLogger = outputLogger; + public DMaaPEventPublisher(Map<String, PublisherConfig> publishersCache) { + this.publishersCache = new DMaaPPublishersCache(publishersCache); } @Override diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java new file mode 100755 index 0000000..5955a9d --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java @@ -0,0 +1,123 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import java.time.Duration; + +/** + * DmaapRequestConfiguration is DMaap request configuration. + */ +public class DmaapRequestConfiguration { + + private static final Long TIMEOUT_SECONDS = 10L; + private static final int RETRY_INTERVAL_IN_SECONDS = 1; + private static final int RETRY_COUNT = 1; + + private DmaapRequestConfiguration() { + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * @param timeout timeout configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) { + String topicUrl = createUrl(publisherConfig); + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(createMessageRouterSink(topicUrl)) + .contentType(ContentType.APPLICATION_JSON) + .timeoutConfig(timeOutConfiguration(timeout)) + .build(); + } + + /** + * Create publish request is DMaap request configuration. + * @param publisherConfig publisher configuration + * return message reouter publish request + */ + static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) { + return createPublishRequest(publisherConfig, TIMEOUT_SECONDS); + } + + /** + * Convert JSON object list. + * @param messages list of messages. + * return flux jsonobject list of messages. + */ + static Flux<JsonObject> jsonBatch(List<String> messages) { + return Flux.fromIterable(getAsJsonObjects(messages)); + } + + /** + * Retry configuration. + * return MessageRouterPublisherConfig message router publish coinfiguration. + */ + static MessageRouterPublisherConfig retryConfiguration() { + return ImmutableMessageRouterPublisherConfig.builder() + .retryConfig(ImmutableDmaapRetryConfig.builder() + .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS) + .retryCount(RETRY_COUNT) + .build()) + .build(); + } + + private static String createUrl(Option<PublisherConfig> publisherConfig) { + String hostAndPort = publisherConfig.get().getHostAndPort(); + String topicName = publisherConfig.get().topic(); + return String.format("http://%s/events/%s/",hostAndPort,topicName); + } + + private static List<JsonObject> getAsJsonObjects(List<String> messages) { + return getAsJsonElements(messages).map(JsonElement::getAsJsonObject); + } + + static List<JsonElement> getAsJsonElements(List<String> messages) { + return messages.map(JsonParser::parseString); + } + + static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) { + return ImmutableMessageRouterSink.builder() + .name("the topic") + .topicUrl(topicUrl) + .build(); + } + + @NotNull + private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) { + return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build(); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java index 91736ec..8042215 100644 --- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ import org.slf4j.Logger; public interface EventPublisher { static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) { - return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + return new DMaaPEventPublisher(dMaaPConfig); } void sendEvent(JSONObject event, String domain); diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java new file mode 100755 index 0000000..4806127 --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java @@ -0,0 +1,111 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcae.restapi.ApiException; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +import java.util.Objects; + +import static org.onap.dcae.ApplicationSettings.responseCompatibility; + +/** + * MessageRouterHttpStatusMapper is responsible for HTTP status mapper. + */ +public class MessageRouterHttpStatusMapper { + + private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class); + + private MessageRouterHttpStatusMapper() { + } + + /** + * Get http status. + * @param messageRouterPublishResponse message reouter publish response. + */ + @NotNull + static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) { + return responseCompatibility.equals("v7.2") ? + getHttpStatusBackwardsCompatibility(messageRouterPublishResponse): + getHttpStatusWithMappedResponseCode(messageRouterPublishResponse); + } + + @NotNull + private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.ACCEPTED; + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) { + if (isHttpOk(messageRouterPublishResponse)) { + log.info("Successfully send event to MR"); + return HttpStatus.OK; + } else if (isHttp413(messageRouterPublishResponse)) { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } else { + log.error(messageRouterPublishResponse.failReason()); + throw new RuntimeException(); + } + } + + @NotNull + private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3); + } + + @NotNull + private static ApiException responseBody(String substring) { + switch (substring) { + case "404": + return ApiException.NOT_FOUND; + case "408": + return ApiException.REQUEST_TIMEOUT; + case "429": + return ApiException.TOO_MANY_REQUESTS; + case "502": + return ApiException.BAD_GATEWAY; + case "503": + return ApiException.SERVICE_UNAVAILABLE; + case "504": + return ApiException.GATEWAY_TIMEOUT; + default: + return ApiException.INTERNAL_SERVER_ERROR; + } + } + + private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) { + return messageRouterPublishResponse.successful(); + } + + private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) { + return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413"); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java new file mode 100755 index 0000000..6f69f4f --- /dev/null +++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.common.publishing; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import reactor.core.publisher.Flux; + +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest; +import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch; + +/** + * Publisher is responsible publish events. + */ +public class Publisher { + + private final MessageRouterPublisher publisher; + + /** + * Constructor + */ + public Publisher() { + this(retryConfiguration()); + } + + /** + * Constructor + * @param messageRouterPublisherConfig message router publish configuration + */ + public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) { + publisher = DmaapClientFactory + .createMessageRouterPublisher(messageRouterPublisherConfig); + } + + /** + * Publish event + * + * @param events list of ves events prepared to send + * @param publisherConfig publisher configuration + * @return flux containing information about the success or failure of the event publication + */ + public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) { + return publishEvents(events, createPublishRequest(publisherConfig)); + } + + Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) { + final Flux<JsonObject> jsonMessageBatch = jsonBatch(events); + return publisher.put(publishRequest, jsonMessageBatch); + } +} diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java index 67aca1d..57210c4 100644 --- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java +++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,10 @@ public final class PublisherConfig { return destinations; } + String getHostAndPort(){ + return destinations.get(0); + } + String topic() { return topic; } diff --git a/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java new file mode 100755 index 0000000..0a8c089 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java @@ -0,0 +1,142 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static io.vavr.API.Try; +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.common.publishing.VavrUtils.logError; +import static org.onap.dcae.configuration.Conversions.toList; + +import io.vavr.CheckedRunnable; +import io.vavr.Tuple2; +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigFilesFacade is used for reading and writing application properties and dmaap configuration. + */ +public class ConfigFilesFacade { + + private static final Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class); + + private Path dmaapConfigPath; + private Path propertiesPath; + + ConfigFilesFacade(Path propertiesPath, Path dMaaPConfigPath) { + this.propertiesPath = propertiesPath; + this.dmaapConfigPath = dMaaPConfigPath; + } + + /** + * Set new paths + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public void setPaths(Path propertiesFile, Path dmaapConfigFile) { + this.propertiesPath = propertiesFile; + this.dmaapConfigPath = dmaapConfigFile; + } + + Try<Map<String, String>> readCollectorProperties() { + log.info(f("Reading collector properties from path: '%s'", propertiesPath)); + return Try(this::readProperties) + .map(prop -> toList(prop.getKeys()).toMap(k -> k, k -> (String) prop.getProperty(k))) + .mapFailure(enhanceError("Unable to read properties configuration from path '%s'", propertiesPath)) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following collector properties: '%s'", props))); + } + + Try<JSONObject> readDMaaPConfiguration() { + log.info(f("Reading DMaaP configuration from file: '%s'", dmaapConfigPath)); + return readFile(dmaapConfigPath) + .recover(FileNotFoundException.class, __ -> "{}") + .mapFailure(enhanceError("Unable to read DMaaP configuration from file '%s'", dmaapConfigPath)) + .flatMap(Conversions::toJson) + .onFailure(logError(log)) + .peek(props -> log.info(f("Read following DMaaP properties: '%s'", props))); + } + + Try<Void> writeDMaaPConfiguration(JSONObject dMaaPConfiguration) { + log.info(f("Writing DMaaP configuration '%s' into file '%s'", dMaaPConfiguration, dmaapConfigPath)); + return writeFile(dmaapConfigPath, indentConfiguration(dMaaPConfiguration.toString())) + .mapFailure(enhanceError("Could not save new DMaaP configuration to path '%s'", dmaapConfigPath)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + + Try<Void> writeProperties(Map<String, String> properties) { + log.info(f("Writing properties configuration '%s' into file '%s'", properties, propertiesPath)); + return Try.run(saveProperties(properties)) + .mapFailure(enhanceError("Could not save properties to path '%s'", properties)) + .onFailure(logError(log)) + .peek(__ -> log.info("Written successfully")); + } + + private Try<String> readFile(Path path) { + return Try(() -> new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) + .mapFailure(enhanceError("Could not read content from path: '%s'", path)); + } + + private Try<Void> writeFile(Path path, String content) { + return Try.run(() -> Files.write(path, content.getBytes())) + .mapFailure(enhanceError("Could not write content to path: '%s'", path)); + } + + private PropertiesConfiguration readProperties() throws ConfigurationException { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(); + propertiesConfiguration.setDelimiterParsingDisabled(true); + propertiesConfiguration.load(propertiesPath.toFile()); + return propertiesConfiguration; + } + + private CheckedRunnable saveProperties(Map<String, String> properties) { + return () -> { + PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(propertiesPath.toFile()); + propertiesConfiguration.setEncoding(null); + for (Tuple2<String, String> property : properties) { + updateProperty(propertiesConfiguration, property); + } + propertiesConfiguration.save(); + }; + } + + private void updateProperty(PropertiesConfiguration propertiesConfiguration, Tuple2<String, String> property) { + if (propertiesConfiguration.containsKey(property._1)) { + propertiesConfiguration.setProperty(property._1, property._2); + } else { + propertiesConfiguration.addProperty(property._1, property._2); + } + } + + private String indentConfiguration(String configuration) { + return new JSONObject(configuration).toString(4); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigParsing.java b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java new file mode 100755 index 0000000..7b2a656 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static org.onap.dcae.common.publishing.VavrUtils.f; +import static org.onap.dcae.configuration.Conversions.toList; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +interface ConfigParsing { + + Logger log = LoggerFactory.getLogger(ConfigParsing.class); + + static Option<JSONObject> getDMaaPConfig(JSONObject configuration) { + log.info(f("Getting DMaaP configuration from app configuration: '%s'", configuration)); + return toList(configuration.toMap().entrySet().iterator()) + .filter(t -> t.getKey().startsWith("streams_publishes")) + .headOption() + .flatMap(e -> Try(() -> configuration.getJSONObject(e.getKey())).toOption()) + .onEmpty(() -> log.warn(f("App configuration '%s' is missing DMaaP configuration ('streams_publishes' key) " + + "or DMaaP configuration is not a valid json document", configuration))) + .peek(dMaaPConf -> log.info(f("Found following DMaaP configuration: '%s'", dMaaPConf))); + } + + static Map<String, String> getProperties(JSONObject configuration) { + log.debug(f("Getting properties configuration from app configuration: '%s'", configuration)); + Map<String, String> confEntries = toList(configuration.toMap().entrySet().iterator()) + .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue()))) + .filterKeys(e -> !e.startsWith("streams_publishes")); + log.debug(f("Found following app properties: '%s'", confEntries)); + return confEntries; + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java new file mode 100755 index 0000000..0d9e660 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java @@ -0,0 +1,118 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; + +/** + * ConfigUpdater is responsible for receiving configuration updates from config file. + */ +public class ConfigUpdater { + + private static final Logger log = LoggerFactory.getLogger(ConfigUpdater.class); + private final ConfigFilesFacade configFilesFacade; + private final Runnable applicationRestarter; + private boolean isApplicationRestartNeeded; + + /** + * Constructor + * @param configFilesFacade provides configuration files facade + * @param applicationRestarter for restart application + */ + public ConfigUpdater(ConfigFilesFacade configFilesFacade, Runnable applicationRestarter) { + this.configFilesFacade = configFilesFacade; + this.applicationRestarter = applicationRestarter; + this.isApplicationRestartNeeded = false; + } + + /** + * Set new paths + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public void setPaths(Path propertiesFile, Path dmaapConfigFile){ + this.configFilesFacade.setPaths(propertiesFile, dmaapConfigFile); + + } + /** + * update configuration + * @param appConfig JSON object list + */ + public synchronized void updateConfig(Option<JSONObject> appConfig) { + appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage()); + } + + private Runnable logSkipMessage() { + return () -> log.info("Skipping dynamic configuration"); + } + + private void handleUpdate(JSONObject appConfig) { + updatePropertiesIfChanged(appConfig); + updateDmaapConfigIfChanged(appConfig); + restartApplicationIfNeeded(); + } + + private void updatePropertiesIfChanged(JSONObject appConfig) { + Map<String, String> newProperties = ConfigParsing.getProperties(appConfig); + Map<String, String> oldProperties = configFilesFacade.readCollectorProperties().get(); + + if (!areCommonPropertiesSame(oldProperties, newProperties)) { + configFilesFacade.writeProperties(newProperties); + isApplicationRestartNeeded = true; + } + } + + private boolean areCommonPropertiesSame(Map<String, String> oldProperties, Map<String, String> newProperties) { + Map<String, String> filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties); + return filteredOldProperties.equals(newProperties); + } + + private Map<String, String> filterIntersectingKeys(Map<String, String> primaryProperties, + Map<String, String> otherProperties) { + return primaryProperties.filterKeys(key -> containsKey(key, otherProperties)); + } + + private boolean containsKey(String key, Map<String, String> properties) { + return properties.keySet().contains(key); + } + + private void updateDmaapConfigIfChanged(JSONObject appConfig) { + JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get(); + JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get(); + + if (!oldDmaapConfig.similar(newDmaapConfig)) { + configFilesFacade.writeDMaaPConfiguration(newDmaapConfig); + isApplicationRestartNeeded = true; + } + } + + private void restartApplicationIfNeeded() { + if (isApplicationRestartNeeded) { + applicationRestarter.run(); + isApplicationRestartNeeded = false; + } + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java new file mode 100755 index 0000000..7fc4532 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import java.nio.file.Path; + +import org.onap.dcae.RestConfCollector; + +/** + * ConfigUpdaterFactory is responsible for receiving configuration from config file or Consul (if config file doesn't exist). + */ +public class ConfigUpdaterFactory { + + private ConfigUpdaterFactory() { + } + + /** + * create configuration updater based on property file and dmaap configuration files + * @param propertiesFile application property file + * @param dmaapConfigFile dmaap configuration file + */ + public static ConfigUpdater create(Path propertiesFile, Path dmaapConfigFile) { + ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile); + return new ConfigUpdater( + configFilesFacade, + RestConfCollector::restartApplication); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java new file mode 100755 index 0000000..9495308 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.json.JSONObject; +import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +/** + * ConfigurationHandler is responsible for receiving configuration updates from config file or Consul (if config file doesn't exist). + * Any change made in the configuration will be reported as a notification. + */ +public class ConfigurationHandler { + + private static Logger log = LoggerFactory.getLogger(ConfigurationHandler.class); + private static final String CONFIG_DICT = "config"; + + private final CbsClientConfigurationProvider cbsClientConfigurationProvider; + private final ConfigUpdater configUpdater; + + /** + * Constructor + * @param cbsClientConfigurationProvider provides configuration to connect with Consul + * @param configUpdater for updating application configuration + */ + public ConfigurationHandler(CbsClientConfigurationProvider cbsClientConfigurationProvider, ConfigUpdater configUpdater) { + this.cbsClientConfigurationProvider = cbsClientConfigurationProvider; + this.configUpdater = configUpdater; + } + + /** + * Start listen for application configuration notifications with configuration changes + * @param interval defines period of time when notification can come + * @return {@link Disposable} handler to close configuration listener at the end + */ + public Disposable startListen(Duration interval) { + + log.info("Start listening for configuration ..."); + log.info(String.format("Configuration will be fetched in %s period.", interval)); + + // Polling properties + final Duration initialDelay = Duration.ofSeconds(5); + final Duration period = interval; + + final CbsRequest request = createCbsRequest(); + final CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationProvider.get(); + + return createCbsClient(cbsClientConfiguration) + .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)) + .subscribe( + this::handleConfiguration, + this::handleError + ); + } + + /** + * Create CBS Client configuration + * @param accept cbsClientConfiguration + * @return return cbs client + */ + Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); + } + + void handleConfiguration(JsonObject jsonObject) { + log.info("Configuration update {}", jsonObject); + if(jsonObject.has(CONFIG_DICT)) { + JsonObject config = jsonObject.getAsJsonObject(CONFIG_DICT); + JSONObject jObject = new JSONObject(config.toString()); + configUpdater.updateConfig(Option.of(jObject)); + } else { + throw new IllegalArgumentException(String.format("Invalid application configuration: %s ", jsonObject)); + } + } + + private void handleError(Throwable throwable) { + log.error("Unexpected error occurred during fetching configuration", throwable); + } + + private CbsRequest createCbsRequest() { + RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + return CbsRequests.getAll(diagnosticContext); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/Conversions.java b/src/main/java/org/onap/dcae/configuration/Conversions.java new file mode 100755 index 0000000..98f632c --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/Conversions.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration; + +import static org.onap.dcae.common.publishing.VavrUtils.enhanceError; + +import io.vavr.API; +import io.vavr.collection.List; +import io.vavr.control.Try; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; +import org.json.JSONObject; + +/** + * Conversions is used to convert to JSON Object. + */ +interface Conversions { + + static Try<JSONObject> toJson(String strBody) { + return API.Try(() -> new JSONObject(strBody)) + .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody)); + } + + static <T> List<T> toList(Iterator<T> iterator) { + return List + .ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)); + } +} diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java new file mode 100755 index 0000000..b808004 --- /dev/null +++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java @@ -0,0 +1,85 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.configuration.cbs; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CbsClientConfigurationProvider is used to provide production or dev configuration for CBS client. + */ +public class CbsClientConfigurationProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationProvider.class); + + private static final String DEFAULT_PROTOCOL = "http"; + private static final String DEFAULT_HOSTNAME = "config-binding-service"; + private static final int DEFAULT_PORT = 10000; + private static final String DEFAULT_APP_NAME = "dcae-ves-collector"; + private static final String DEV_MODE_PROPERTY = "devMode"; + private static final String CBS_PORT_PROPERTY = "cbsPort"; + + /** + * Returns configuration for CBS client. + * @return Production or dev configuration for CBS client, depends on application run arguments. + */ + public CbsClientConfiguration get() { + try { + if (isDevModeEnabled()) { + return getDevConfiguration(); + } else { + return CbsClientConfiguration.fromEnvironment(); + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed resolving CBS client configuration from system environments: %s", e)); + } + return getFallbackConfiguration(); + } + + @NotNull + private ImmutableCbsClientConfiguration getDevConfiguration() { + return createCbsClientConfiguration( + DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, + Integer.parseInt(System.getProperty(CBS_PORT_PROPERTY, String.valueOf(DEFAULT_PORT))) + ); + } + + private boolean isDevModeEnabled() { + return System.getProperty(DEV_MODE_PROPERTY) != null; + } + + private ImmutableCbsClientConfiguration getFallbackConfiguration() { + LOGGER.info("Falling back to use default CBS client configuration"); + return createCbsClientConfiguration(DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, DEFAULT_PORT); + } + + private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname, + String appName, Integer port) { + return ImmutableCbsClientConfiguration.builder() + .protocol(protocol) + .hostname(hostname) + .port(port) + .appName(appName) + .build(); + } +} diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java index d8c21b1..426ff87 100644 --- a/src/main/java/org/onap/dcae/restapi/ApiException.java +++ b/src/main/java/org/onap/dcae/restapi/ApiException.java @@ -3,7 +3,7 @@ * org.onap.dcaegen2.restconfcollector * ================================================================================ * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. + * Copyright (C) 2018-2022 Huawei. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,21 +23,38 @@ package org.onap.dcae.restapi; import com.google.common.base.CaseFormat; import org.json.JSONObject; +import java.util.ArrayList; +import java.util.List; + /** * @author Pawel Szalapski (pawel.szalapski@nokia.com) */ public enum ApiException { - UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401); + + UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401), + NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404), + REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408), + TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429), + INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500), + BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502), + SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503), + GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504); public final int httpStatusCode; private final ExceptionType type; private final String code; private final String details; + private final List<String> variables; ApiException(ExceptionType type, String code, String details, int httpStatusCode) { + this(type, code, details, new ArrayList<>(), httpStatusCode); + } + + ApiException(ExceptionType type, String code, String details, List<String> variables, int httpStatusCode) { this.type = type; this.code = code; this.details = details; + this.variables = variables; this.httpStatusCode = httpStatusCode; } @@ -45,6 +62,9 @@ public enum ApiException { JSONObject exceptionTypeNode = new JSONObject(); exceptionTypeNode.put("messageId", code); exceptionTypeNode.put("text", details); + if(!variables.isEmpty()) { + exceptionTypeNode.put("variables", variables); + } JSONObject requestErrorNode = new JSONObject(); requestErrorNode.put(type.toString(), exceptionTypeNode); @@ -55,7 +75,7 @@ public enum ApiException { } public enum ExceptionType { - POLICY_EXCEPTION; + SERVICE_EXCEPTION, POLICY_EXCEPTION; @Override public String toString() { diff --git a/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java new file mode 100755 index 0000000..fca296b --- /dev/null +++ b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.restconfcollector + * ================================================================================ + * Copyright (C) 2022 Huawei. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.dcae.configuration.ConfigurationHandler; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; + +@RunWith(MockitoJUnitRunner.class) +public class ApplicationConfigurationListenerTest { + + @Mock + private ConfigurationHandler configurationHandler; + + @InjectMocks + @Spy + private ApplicationConfigurationListener applicationConfigurationListener; + + @Test + public void shouldStopJobAndCloseConnectionWhenErrorOccurredDuringListenAtConfigChange() { + + // given + Mockito.doThrow(new RuntimeException("Simulate exception")).when(configurationHandler).startListen(any()); + + // when + applicationConfigurationListener.run(); + + // then + Mockito.verify(applicationConfigurationListener).stopListeningForConfigurationUpdates(any()); + } + + @Test + public void shouldSendReloadAction() { + + // when + applicationConfigurationListener.reload(Duration.ofMillis(1)); + + // then + Mockito.verify(applicationConfigurationListener).sendReloadAction(); + } +} diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java deleted file mode 100644 index 79ac03b..0000000 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.restconfcollector - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * Copyright (C) 2018-2019 Huawei. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcae.common.publishing; - -import static io.vavr.API.Option; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import org.json.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; - -public class DMaaPEventPublisherTest { - - private static final String STREAM_ID = "sampleStreamId"; - - private DMaaPEventPublisher eventPublisher; - private CambriaBatchingPublisher cambriaPublisher; - private DMaaPPublishersCache dmaapPublishersCache; - - /** - * Setup before test. - */ - @Before - public void setUp() { - cambriaPublisher = mock(CambriaBatchingPublisher.class); - dmaapPublishersCache = mock(DMaaPPublishersCache.class); - when(dmaapPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); - eventPublisher = new DMaaPEventPublisher(dmaapPublishersCache, mock(Logger.class)); - } - - @Test - public void shouldSendEventToTopic() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(cambriaPublisher).send("MyPartitionKey", event.toString()); - } - - - - @Test - public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(dmaapPublishersCache).closePublisherFor(STREAM_ID); - } -}
\ No newline at end of file diff --git a/version.properties b/version.properties index 7d6815b..fee4928 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=3 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT |