summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Changelog.md3
-rw-r--r--pom.xml41
-rwxr-xr-xsrc/main/java/org/onap/dcae/ApplicationConfigurationListener.java108
-rw-r--r--src/main/java/org/onap/dcae/ApplicationSettings.java7
-rw-r--r--src/main/java/org/onap/dcae/RestConfCollector.java113
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java12
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java123
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/EventPublisher.java4
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java111
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/publishing/Publisher.java74
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java6
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java142
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/ConfigParsing.java56
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/ConfigUpdater.java118
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java45
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/ConfigurationHandler.java114
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/Conversions.java47
-rwxr-xr-xsrc/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java85
-rw-r--r--src/main/java/org/onap/dcae/restapi/ApiException.java26
-rwxr-xr-xsrc/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java67
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java83
-rw-r--r--version.properties2
22 files changed, 1224 insertions, 163 deletions
diff --git a/Changelog.md b/Changelog.md
index d17a970..8c8d5c7 100644
--- a/Changelog.md
+++ b/Changelog.md
@@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [1.3.1] - 2022/01/20
+ - [DCAEGEN2-2857] - RESTConf - Switch CBS client library to 1.8.7 or higher
+
## [1.3.0] - 2022/01/18
- [DCAEGEN2-2962] - Switch RESTCONF Collector to Integration base image
- [DCAEGEN2-3022] - Remediation for Log4Shell vulnerability (upgrade to 2.17.1)
diff --git a/pom.xml b/pom.xml
index fce456e..d0ba10b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,11 +23,11 @@ limitations under the License.
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
<version>2.0.0</version>
- <relativePath/>
+ <relativePath />
</parent>
<groupId>org.onap.dcaegen2.collectors.restconf</groupId>
<artifactId>restconfcollector</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>1.3.1-SNAPSHOT</version>
<name>dcaegen2-collectors-restconf</name>
<description>RestConfCollector</description>
<properties>
@@ -48,9 +48,7 @@ limitations under the License.
<releases.path>content/repositories/releases/</releases.path>
<site.path>content/sites/site/org/onap/dcaegen2/collectors/restconf/${project.artifactId}/${project.version}</site.path>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
- <sonar.coverage.jacoco.xmlReportPaths>
- ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
- </sonar.coverage.jacoco.xmlReportPaths>
+ <sonar.coverage.jacoco.xmlReportPaths>${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
</properties>
<build>
<pluginManagement>
@@ -199,9 +197,9 @@ limitations under the License.
<serverId>${onap.nexus.dockerregistry.daily}</serverId>
<imageName>${onap.nexus.dockerregistry.daily}/${docker.image.name}</imageName>
<imageTags>
- <imageTag>${project.version}-${maven.build.timestamp}Z</imageTag>
- <imageTag>${project.version}</imageTag>
- <imageTag>latest</imageTag>
+ <imageTag>${project.version}-${maven.build.timestamp}Z</imageTag>
+ <imageTag>${project.version}</imageTag>
+ <imageTag>latest</imageTag>
</imageTags>
<dockerDirectory>${project.basedir}/src/main/docker</dockerDirectory>
<resources>
@@ -314,13 +312,18 @@ limitations under the License.
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.8.5</version>
+ <version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>30.1-jre</version>
+ </dependency>
<!-- REST API RELATED -->
<dependency>
<groupId>com.att.nsa</groupId>
@@ -401,7 +404,7 @@ limitations under the License.
<version>2.10.5</version>
<scope>compile</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.27</version>
@@ -505,6 +508,22 @@ limitations under the License.
<artifactId>crypt-password</artifactId>
<version>1.3.1</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ <version>1.8.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
+ <version>1.8.7</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<repositories>
<repository>
@@ -525,4 +544,4 @@ limitations under the License.
<url>http://maven.restlet.com</url>
</pluginRepository>
</pluginRepositories>
-</project>
+</project> \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java
new file mode 100755
index 0000000..5975df5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/ApplicationConfigurationListener.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+package org.onap.dcae;
+
+import org.onap.dcae.configuration.ConfigurationHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+
+import java.time.Duration;
+
+/**
+ * ApplicationConfigurationListener is used to listen at notifications with configuration updates.
+ */
+public class ApplicationConfigurationListener implements Runnable {
+
+ private static Logger log = LoggerFactory.getLogger(ApplicationConfigurationListener.class);
+
+ private Duration interval;
+ private boolean terminate = false;
+ private final ConfigurationHandler configurationHandler;
+
+ /**
+ * Constructor
+ * @param interval defines period of time when notification can come
+ * @param configurationHandler handles notifications
+ */
+ public ApplicationConfigurationListener(Duration interval, ConfigurationHandler configurationHandler) {
+ this.interval = interval;
+ this.configurationHandler = configurationHandler;
+ }
+
+ /**
+ * Reload listener to start listening for configurations notifications with defined interval.
+ * @param interval defines period of time when notification can come
+ */
+ public synchronized void reload(Duration interval) {
+ this.interval = interval;
+ log.info("Handler configuration was changed. Need to reload configuration handler.");
+ sendReloadAction();
+ }
+
+ /**
+ * Send reload action listener to notifications all.
+ */
+ synchronized void sendReloadAction() {
+ this.notifyAll();
+ }
+
+ /**
+ * Start listening for configurations notification.
+ */
+ @Override
+ public void run() {
+ Disposable configListener = null;
+ do {
+ try {
+ configListener = listenForConfigurationUpdates();
+ synchronized (this) {
+ log.info("Switch to configuration handler thread. Active waiting for configuration.");
+ this.wait();
+ }
+ } catch (Exception e) {
+ log.error("Unexpected error occurred during handling data.", e);
+ terminate();
+ } finally {
+ stopListeningForConfigurationUpdates(configListener);
+ }
+ } while (!this.terminate);
+ }
+
+ private Disposable listenForConfigurationUpdates() {
+ return this.configurationHandler.startListen(this.interval);
+ }
+
+ void terminate() {
+ this.terminate = true;
+ }
+
+ /**
+ * Release resources when there is a need to stop listener
+ * @param consulListener Handler to configurations listener
+ */
+ void stopListeningForConfigurationUpdates(Disposable consulListener) {
+ if (consulListener != null) {
+ consulListener.dispose();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/ApplicationSettings.java b/src/main/java/org/onap/dcae/ApplicationSettings.java
index 9caf2f6..0dcec98 100644
--- a/src/main/java/org/onap/dcae/ApplicationSettings.java
+++ b/src/main/java/org/onap/dcae/ApplicationSettings.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,7 +44,7 @@ import java.util.Iterator;
public class ApplicationSettings {
private static final Logger log = LoggerFactory.getLogger(ApplicationSettings.class);
-
+ public static String responseCompatibility;
private final String appInvocationDir;
private final String configurationFileLocation;
private final PropertiesConfiguration properties = new PropertiesConfiguration();
@@ -175,6 +175,9 @@ public class ApplicationSettings {
return prependWithUserDirOnRelative(properties.getString("collector.eventinfo", "etc/ont_config.json"));
}
+ public int configurationUpdateFrequency() {
+ return properties.getInt("collector.dynamic.config.update.frequency", 5);
+ }
public String dMaaPStreamsMapping() {
return properties.getString("collector.rcc.dmaap.streamid", null);
diff --git a/src/main/java/org/onap/dcae/RestConfCollector.java b/src/main/java/org/onap/dcae/RestConfCollector.java
index 545bfd4..ff749f3 100644
--- a/src/main/java/org/onap/dcae/RestConfCollector.java
+++ b/src/main/java/org/onap/dcae/RestConfCollector.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,12 +26,11 @@ import org.json.JSONArray;
import org.json.JSONObject;
import org.onap.dcae.common.ControllerActivationState;
import org.onap.dcae.common.EventData;
-import org.onap.dcae.common.EventProcessor;
import org.onap.dcae.common.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.common.publishing.DMaaPEventPublisher;
import org.onap.dcae.common.publishing.EventPublisher;
import org.onap.dcae.common.publishing.PublisherConfig;
import org.onap.dcae.controller.AccessController;
-import org.onap.dcae.controller.ConfigLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
@@ -39,61 +38,98 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
+import org.onap.dcae.configuration.ConfigurationHandler;
+import org.onap.dcae.configuration.ConfigUpdater;
+import org.onap.dcae.configuration.ConfigUpdaterFactory;
+import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
@SpringBootApplication(exclude = {GsonAutoConfiguration.class, SecurityAutoConfiguration.class})
public class RestConfCollector {
private static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.common.output");
- private static final int MAX_THREADS = 20;
+ private static final int DEFAULT_CONFIGURATION_FETCH_PERIOD = 5;
private static Logger log = LoggerFactory.getLogger(RestConfCollector.class);
public static LinkedBlockingQueue<EventData> fProcessingInputQueue;
private static ApplicationSettings properties;
private static ConfigurableApplicationContext context;
- private static ConfigLoader configLoader;
private static SpringApplication app;
- private static ScheduledFuture<?> scheduleFeatures;
- private static ScheduledFuture<?> scheduleCtrlActivation;
- private static ExecutorService executor;
private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private static ScheduledThreadPoolExecutor scheduledThExController;
private static EventPublisher eventPublisher;
- private static EventProcessor eventProcessor;
+ private static ConfigUpdater configUpdater;
+ private static ApplicationConfigurationListener applicationConfigurationListener;
+ private static ReentrantLock applicationLock = new ReentrantLock();
/* List of Controllers */
private static java.util.Map<String, AccessController> controllerStore = new ConcurrentHashMap<>();
public static void main(String[] args) {
+ applicationLock.lock();
+ try {
+ startApplication(args);
+ startListeningForApplicationConfigurationStoredInConsul();
+ } finally {
+ applicationLock.unlock();
+ }
+ }
+
+ private static void startApplication(String[] args) {
oplog.info("RestconfController starting");
app = new SpringApplication(RestConfCollector.class);
properties = new ApplicationSettings(args, CLIUtils::processCmdLine);
+ configUpdater = ConfigUpdaterFactory.create(
+ properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThExController = new ScheduledThreadPoolExecutor(1);
init();
+ applicationConfigurationListener = startListeningForApplicationConfigurationStoredInConsul();
app.setAddCommandLineProperties(true);
context = app.run();
controllerConfig(properties);
- configLoader.updateConfig();
oplog.info("RestConfController running .....");
}
+ private static ApplicationConfigurationListener startListeningForApplicationConfigurationStoredInConsul() {
+ ConfigurationHandler cbsHandler = new ConfigurationHandler(new CbsClientConfigurationProvider(), configUpdater);
+ ApplicationConfigurationListener applicationConfigProvider = new ApplicationConfigurationListener(Duration.ofMinutes(DEFAULT_CONFIGURATION_FETCH_PERIOD), cbsHandler);
+
+ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
+ scheduledThreadPoolExecutor.execute(applicationConfigProvider);
+
+ return applicationConfigProvider;
+ }
public static void restartApplication() {
Thread thread = new Thread(() -> {
- controllerConfigCleanup();
- context.close();
- properties.reloadProperties();
- scheduleFeatures.cancel(true);
- scheduleCtrlActivation.cancel(true);
- init();
- controllerConfig(properties);
- context = SpringApplication.run(RestConfCollector.class);
+ try {
+ applicationLock.lock();
+ controllerConfigCleanup();
+ if (context != null) {
+ context.close();
+ }
+ properties.reloadProperties();
+ init();
+ controllerConfig(properties);
+ context = SpringApplication.run(RestConfCollector.class);
+
+ configUpdater.setPaths(properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
+ applicationConfigurationListener.reload(Duration.ofMinutes(properties.configurationUpdateFrequency()));
+ } finally {
+ applicationLock.unlock();
+ }
});
thread.setDaemon(false);
thread.start();
@@ -102,9 +138,11 @@ public class RestConfCollector {
private static void init() {
fProcessingInputQueue = new LinkedBlockingQueue<>(properties.maximumAllowedQueuedEvents());
- createConfigLoader();
- createSchedulePoolExecutor();
- createExecutors();
+
+ configUpdater = ConfigUpdaterFactory.create(
+ properties.configurationFileLocation(),
+ Paths.get(properties.dMaaPConfigurationFileLocation()));
+ eventPublisher = new DMaaPEventPublisher(getDmapConfig());
}
private static Map<String, PublisherConfig> getDmapConfig() {
@@ -170,41 +208,10 @@ public class RestConfCollector {
log.info("RestConfCollector.handleEvents:EVENTS has been published successfully!");
}
- private static void createConfigLoader() {
- log.info("dMaaPConfigurationFileLocation " + properties.dMaaPConfigurationFileLocation() + " " + properties.configurationFileLocation());
-
- configLoader = ConfigLoader.create(getEventPublisher()::reconfigure,
- Paths.get(properties.dMaaPConfigurationFileLocation()),
- properties.configurationFileLocation());
- }
-
private static EventPublisher getEventPublisher() {
return EventPublisher.createPublisher(oplog, getDmapConfig());
}
- private static void createSchedulePoolExecutor() {
- scheduleFeatures = scheduledThreadPoolExecutor.scheduleAtFixedRate(configLoader::updateConfig,
- 10,
- 10,
- TimeUnit.MINUTES);
- ControllerActivationTask task = new ControllerActivationTask();
- scheduleCtrlActivation = scheduledThExController.scheduleAtFixedRate(task,
- 10,
- 10,
- TimeUnit.SECONDS);
- }
-
- private static void createExecutors() {
- eventPublisher = EventPublisher.createPublisher(oplog, getDmapConfig());
- eventProcessor = new EventProcessor(eventPublisher,
- parseStreamIdToStreamHashMapping(properties.dMaaPStreamsMapping()));
-
- executor = Executors.newFixedThreadPool(MAX_THREADS);
- for (int i = 0; i < MAX_THREADS; ++i) {
- executor.execute(eventProcessor);
- }
- }
-
private static class ControllerActivationTask implements Runnable
{
public ControllerActivationTask() {
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 1209b38..e023c57 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,16 +36,14 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
-class DMaaPEventPublisher implements EventPublisher {
+public final class DMaaPEventPublisher implements EventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
- private final Logger outputLogger;
+ private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");;
- DMaaPEventPublisher(DMaaPPublishersCache publishersCache,
- Logger outputLogger) {
- this.publishersCache = publishersCache;
- this.outputLogger = outputLogger;
+ public DMaaPEventPublisher(Map<String, PublisherConfig> publishersCache) {
+ this.publishersCache = new DMaaPPublishersCache(publishersCache);
}
@Override
diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
new file mode 100755
index 0000000..5955a9d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+
+/**
+ * DmaapRequestConfiguration is DMaap request configuration.
+ */
+public class DmaapRequestConfiguration {
+
+ private static final Long TIMEOUT_SECONDS = 10L;
+ private static final int RETRY_INTERVAL_IN_SECONDS = 1;
+ private static final int RETRY_COUNT = 1;
+
+ private DmaapRequestConfiguration() {
+ }
+
+ /**
+ * Create publish request is DMaap request configuration.
+ * @param publisherConfig publisher configuration
+ * @param timeout timeout configuration
+ * return message reouter publish request
+ */
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
+ String topicUrl = createUrl(publisherConfig);
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(createMessageRouterSink(topicUrl))
+ .contentType(ContentType.APPLICATION_JSON)
+ .timeoutConfig(timeOutConfiguration(timeout))
+ .build();
+ }
+
+ /**
+ * Create publish request is DMaap request configuration.
+ * @param publisherConfig publisher configuration
+ * return message reouter publish request
+ */
+ static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
+ return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
+ }
+
+ /**
+ * Convert JSON object list.
+ * @param messages list of messages.
+ * return flux jsonobject list of messages.
+ */
+ static Flux<JsonObject> jsonBatch(List<String> messages) {
+ return Flux.fromIterable(getAsJsonObjects(messages));
+ }
+
+ /**
+ * Retry configuration.
+ * return MessageRouterPublisherConfig message router publish coinfiguration.
+ */
+ static MessageRouterPublisherConfig retryConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .retryConfig(ImmutableDmaapRetryConfig.builder()
+ .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
+ .retryCount(RETRY_COUNT)
+ .build())
+ .build();
+ }
+
+ private static String createUrl(Option<PublisherConfig> publisherConfig) {
+ String hostAndPort = publisherConfig.get().getHostAndPort();
+ String topicName = publisherConfig.get().topic();
+ return String.format("http://%s/events/%s/",hostAndPort,topicName);
+ }
+
+ private static List<JsonObject> getAsJsonObjects(List<String> messages) {
+ return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+ }
+
+ static List<JsonElement> getAsJsonElements(List<String> messages) {
+ return messages.map(JsonParser::parseString);
+ }
+
+ static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+ return ImmutableMessageRouterSink.builder()
+ .name("the topic")
+ .topicUrl(topicUrl)
+ .build();
+ }
+
+ @NotNull
+ private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
+ return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
index 91736ec..8042215 100644
--- a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
public interface EventPublisher {
static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
- return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ return new DMaaPEventPublisher(dMaaPConfig);
}
void sendEvent(JSONObject event, String domain);
diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
new file mode 100755
index 0000000..4806127
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
@@ -0,0 +1,111 @@
+/*
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import java.util.Objects;
+
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+
+/**
+ * MessageRouterHttpStatusMapper is responsible for HTTP status mapper.
+ */
+public class MessageRouterHttpStatusMapper {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class);
+
+ private MessageRouterHttpStatusMapper() {
+ }
+
+ /**
+ * Get http status.
+ * @param messageRouterPublishResponse message reouter publish response.
+ */
+ @NotNull
+ static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return responseCompatibility.equals("v7.2") ?
+ getHttpStatusBackwardsCompatibility(messageRouterPublishResponse):
+ getHttpStatusWithMappedResponseCode(messageRouterPublishResponse);
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.ACCEPTED;
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ }
+ }
+
+ @NotNull
+ private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ if (isHttpOk(messageRouterPublishResponse)) {
+ log.info("Successfully send event to MR");
+ return HttpStatus.OK;
+ } else if (isHttp413(messageRouterPublishResponse)) {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ } else {
+ log.error(messageRouterPublishResponse.failReason());
+ throw new RuntimeException();
+ }
+ }
+
+ @NotNull
+ private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3);
+ }
+
+ @NotNull
+ private static ApiException responseBody(String substring) {
+ switch (substring) {
+ case "404":
+ return ApiException.NOT_FOUND;
+ case "408":
+ return ApiException.REQUEST_TIMEOUT;
+ case "429":
+ return ApiException.TOO_MANY_REQUESTS;
+ case "502":
+ return ApiException.BAD_GATEWAY;
+ case "503":
+ return ApiException.SERVICE_UNAVAILABLE;
+ case "504":
+ return ApiException.GATEWAY_TIMEOUT;
+ default:
+ return ApiException.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return messageRouterPublishResponse.successful();
+ }
+
+ private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) {
+ return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413");
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
new file mode 100755
index 0000000..6f69f4f
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
+
+/**
+ * Publisher is responsible publish events.
+ */
+public class Publisher {
+
+ private final MessageRouterPublisher publisher;
+
+ /**
+ * Constructor
+ */
+ public Publisher() {
+ this(retryConfiguration());
+ }
+
+ /**
+ * Constructor
+ * @param messageRouterPublisherConfig message router publish configuration
+ */
+ public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
+ publisher = DmaapClientFactory
+ .createMessageRouterPublisher(messageRouterPublisherConfig);
+ }
+
+ /**
+ * Publish event
+ *
+ * @param events list of ves events prepared to send
+ * @param publisherConfig publisher configuration
+ * @return flux containing information about the success or failure of the event publication
+ */
+ public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
+ return publishEvents(events, createPublishRequest(publisherConfig));
+ }
+
+ Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
+ return publisher.put(publishRequest, jsonMessageBatch);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
index 67aca1d..57210c4 100644
--- a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
+++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,6 +51,10 @@ public final class PublisherConfig {
return destinations;
}
+ String getHostAndPort(){
+ return destinations.get(0);
+ }
+
String topic() {
return topic;
}
diff --git a/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java
new file mode 100755
index 0000000..0a8c089
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/ConfigFilesFacade.java
@@ -0,0 +1,142 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+import static org.onap.dcae.common.publishing.VavrUtils.logError;
+import static org.onap.dcae.configuration.Conversions.toList;
+
+import io.vavr.CheckedRunnable;
+import io.vavr.Tuple2;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ConfigFilesFacade is used for reading and writing application properties and dmaap configuration.
+ */
+public class ConfigFilesFacade {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigFilesFacade.class);
+
+ private Path dmaapConfigPath;
+ private Path propertiesPath;
+
+ ConfigFilesFacade(Path propertiesPath, Path dMaaPConfigPath) {
+ this.propertiesPath = propertiesPath;
+ this.dmaapConfigPath = dMaaPConfigPath;
+ }
+
+ /**
+ * Set new paths
+ * @param propertiesFile application property file
+ * @param dmaapConfigFile dmaap configuration file
+ */
+ public void setPaths(Path propertiesFile, Path dmaapConfigFile) {
+ this.propertiesPath = propertiesFile;
+ this.dmaapConfigPath = dmaapConfigFile;
+ }
+
+ Try<Map<String, String>> readCollectorProperties() {
+ log.info(f("Reading collector properties from path: '%s'", propertiesPath));
+ return Try(this::readProperties)
+ .map(prop -> toList(prop.getKeys()).toMap(k -> k, k -> (String) prop.getProperty(k)))
+ .mapFailure(enhanceError("Unable to read properties configuration from path '%s'", propertiesPath))
+ .onFailure(logError(log))
+ .peek(props -> log.info(f("Read following collector properties: '%s'", props)));
+ }
+
+ Try<JSONObject> readDMaaPConfiguration() {
+ log.info(f("Reading DMaaP configuration from file: '%s'", dmaapConfigPath));
+ return readFile(dmaapConfigPath)
+ .recover(FileNotFoundException.class, __ -> "{}")
+ .mapFailure(enhanceError("Unable to read DMaaP configuration from file '%s'", dmaapConfigPath))
+ .flatMap(Conversions::toJson)
+ .onFailure(logError(log))
+ .peek(props -> log.info(f("Read following DMaaP properties: '%s'", props)));
+ }
+
+ Try<Void> writeDMaaPConfiguration(JSONObject dMaaPConfiguration) {
+ log.info(f("Writing DMaaP configuration '%s' into file '%s'", dMaaPConfiguration, dmaapConfigPath));
+ return writeFile(dmaapConfigPath, indentConfiguration(dMaaPConfiguration.toString()))
+ .mapFailure(enhanceError("Could not save new DMaaP configuration to path '%s'", dmaapConfigPath))
+ .onFailure(logError(log))
+ .peek(__ -> log.info("Written successfully"));
+ }
+
+
+ Try<Void> writeProperties(Map<String, String> properties) {
+ log.info(f("Writing properties configuration '%s' into file '%s'", properties, propertiesPath));
+ return Try.run(saveProperties(properties))
+ .mapFailure(enhanceError("Could not save properties to path '%s'", properties))
+ .onFailure(logError(log))
+ .peek(__ -> log.info("Written successfully"));
+ }
+
+ private Try<String> readFile(Path path) {
+ return Try(() -> new String(Files.readAllBytes(path), StandardCharsets.UTF_8))
+ .mapFailure(enhanceError("Could not read content from path: '%s'", path));
+ }
+
+ private Try<Void> writeFile(Path path, String content) {
+ return Try.run(() -> Files.write(path, content.getBytes()))
+ .mapFailure(enhanceError("Could not write content to path: '%s'", path));
+ }
+
+ private PropertiesConfiguration readProperties() throws ConfigurationException {
+ PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
+ propertiesConfiguration.setDelimiterParsingDisabled(true);
+ propertiesConfiguration.load(propertiesPath.toFile());
+ return propertiesConfiguration;
+ }
+
+ private CheckedRunnable saveProperties(Map<String, String> properties) {
+ return () -> {
+ PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(propertiesPath.toFile());
+ propertiesConfiguration.setEncoding(null);
+ for (Tuple2<String, String> property : properties) {
+ updateProperty(propertiesConfiguration, property);
+ }
+ propertiesConfiguration.save();
+ };
+ }
+
+ private void updateProperty(PropertiesConfiguration propertiesConfiguration, Tuple2<String, String> property) {
+ if (propertiesConfiguration.containsKey(property._1)) {
+ propertiesConfiguration.setProperty(property._1, property._2);
+ } else {
+ propertiesConfiguration.addProperty(property._1, property._2);
+ }
+ }
+
+ private String indentConfiguration(String configuration) {
+ return new JSONObject(configuration).toString(4);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/ConfigParsing.java b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java
new file mode 100755
index 0000000..7b2a656
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/ConfigParsing.java
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+import static org.onap.dcae.configuration.Conversions.toList;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+interface ConfigParsing {
+
+ Logger log = LoggerFactory.getLogger(ConfigParsing.class);
+
+ static Option<JSONObject> getDMaaPConfig(JSONObject configuration) {
+ log.info(f("Getting DMaaP configuration from app configuration: '%s'", configuration));
+ return toList(configuration.toMap().entrySet().iterator())
+ .filter(t -> t.getKey().startsWith("streams_publishes"))
+ .headOption()
+ .flatMap(e -> Try(() -> configuration.getJSONObject(e.getKey())).toOption())
+ .onEmpty(() -> log.warn(f("App configuration '%s' is missing DMaaP configuration ('streams_publishes' key) "
+ + "or DMaaP configuration is not a valid json document", configuration)))
+ .peek(dMaaPConf -> log.info(f("Found following DMaaP configuration: '%s'", dMaaPConf)));
+ }
+
+ static Map<String, String> getProperties(JSONObject configuration) {
+ log.debug(f("Getting properties configuration from app configuration: '%s'", configuration));
+ Map<String, String> confEntries = toList(configuration.toMap().entrySet().iterator())
+ .toMap(e -> Tuple(e.getKey(), String.valueOf(e.getValue())))
+ .filterKeys(e -> !e.startsWith("streams_publishes"));
+ log.debug(f("Found following app properties: '%s'", confEntries));
+ return confEntries;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java
new file mode 100755
index 0000000..0d9e660
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdater.java
@@ -0,0 +1,118 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * ConfigUpdater is responsible for receiving configuration updates from config file.
+ */
+public class ConfigUpdater {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigUpdater.class);
+ private final ConfigFilesFacade configFilesFacade;
+ private final Runnable applicationRestarter;
+ private boolean isApplicationRestartNeeded;
+
+ /**
+ * Constructor
+ * @param configFilesFacade provides configuration files facade
+ * @param applicationRestarter for restart application
+ */
+ public ConfigUpdater(ConfigFilesFacade configFilesFacade, Runnable applicationRestarter) {
+ this.configFilesFacade = configFilesFacade;
+ this.applicationRestarter = applicationRestarter;
+ this.isApplicationRestartNeeded = false;
+ }
+
+ /**
+ * Set new paths
+ * @param propertiesFile application property file
+ * @param dmaapConfigFile dmaap configuration file
+ */
+ public void setPaths(Path propertiesFile, Path dmaapConfigFile){
+ this.configFilesFacade.setPaths(propertiesFile, dmaapConfigFile);
+
+ }
+ /**
+ * update configuration
+ * @param appConfig JSON object list
+ */
+ public synchronized void updateConfig(Option<JSONObject> appConfig) {
+ appConfig.peek(this::handleUpdate).onEmpty(logSkipMessage());
+ }
+
+ private Runnable logSkipMessage() {
+ return () -> log.info("Skipping dynamic configuration");
+ }
+
+ private void handleUpdate(JSONObject appConfig) {
+ updatePropertiesIfChanged(appConfig);
+ updateDmaapConfigIfChanged(appConfig);
+ restartApplicationIfNeeded();
+ }
+
+ private void updatePropertiesIfChanged(JSONObject appConfig) {
+ Map<String, String> newProperties = ConfigParsing.getProperties(appConfig);
+ Map<String, String> oldProperties = configFilesFacade.readCollectorProperties().get();
+
+ if (!areCommonPropertiesSame(oldProperties, newProperties)) {
+ configFilesFacade.writeProperties(newProperties);
+ isApplicationRestartNeeded = true;
+ }
+ }
+
+ private boolean areCommonPropertiesSame(Map<String, String> oldProperties, Map<String, String> newProperties) {
+ Map<String, String> filteredOldProperties = filterIntersectingKeys(oldProperties, newProperties);
+ return filteredOldProperties.equals(newProperties);
+ }
+
+ private Map<String, String> filterIntersectingKeys(Map<String, String> primaryProperties,
+ Map<String, String> otherProperties) {
+ return primaryProperties.filterKeys(key -> containsKey(key, otherProperties));
+ }
+
+ private boolean containsKey(String key, Map<String, String> properties) {
+ return properties.keySet().contains(key);
+ }
+
+ private void updateDmaapConfigIfChanged(JSONObject appConfig) {
+ JSONObject oldDmaapConfig = configFilesFacade.readDMaaPConfiguration().get();
+ JSONObject newDmaapConfig = ConfigParsing.getDMaaPConfig(appConfig).get();
+
+ if (!oldDmaapConfig.similar(newDmaapConfig)) {
+ configFilesFacade.writeDMaaPConfiguration(newDmaapConfig);
+ isApplicationRestartNeeded = true;
+ }
+ }
+
+ private void restartApplicationIfNeeded() {
+ if (isApplicationRestartNeeded) {
+ applicationRestarter.run();
+ isApplicationRestartNeeded = false;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java
new file mode 100755
index 0000000..7fc4532
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/ConfigUpdaterFactory.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import java.nio.file.Path;
+
+import org.onap.dcae.RestConfCollector;
+
+/**
+ * ConfigUpdaterFactory is responsible for receiving configuration from config file or Consul (if config file doesn't exist).
+ */
+public class ConfigUpdaterFactory {
+
+ private ConfigUpdaterFactory() {
+ }
+
+ /**
+ * create configuration updater based on property file and dmaap configuration files
+ * @param propertiesFile application property file
+ * @param dmaapConfigFile dmaap configuration file
+ */
+ public static ConfigUpdater create(Path propertiesFile, Path dmaapConfigFile) {
+ ConfigFilesFacade configFilesFacade = new ConfigFilesFacade(propertiesFile, dmaapConfigFile);
+ return new ConfigUpdater(
+ configFilesFacade,
+ RestConfCollector::restartApplication);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java
new file mode 100755
index 0000000..9495308
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/ConfigurationHandler.java
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import com.google.gson.JsonObject;
+import io.vavr.control.Option;
+import org.json.JSONObject;
+import org.onap.dcae.configuration.cbs.CbsClientConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+
+/**
+ * ConfigurationHandler is responsible for receiving configuration updates from config file or Consul (if config file doesn't exist).
+ * Any change made in the configuration will be reported as a notification.
+ */
+public class ConfigurationHandler {
+
+ private static Logger log = LoggerFactory.getLogger(ConfigurationHandler.class);
+ private static final String CONFIG_DICT = "config";
+
+ private final CbsClientConfigurationProvider cbsClientConfigurationProvider;
+ private final ConfigUpdater configUpdater;
+
+ /**
+ * Constructor
+ * @param cbsClientConfigurationProvider provides configuration to connect with Consul
+ * @param configUpdater for updating application configuration
+ */
+ public ConfigurationHandler(CbsClientConfigurationProvider cbsClientConfigurationProvider, ConfigUpdater configUpdater) {
+ this.cbsClientConfigurationProvider = cbsClientConfigurationProvider;
+ this.configUpdater = configUpdater;
+ }
+
+ /**
+ * Start listen for application configuration notifications with configuration changes
+ * @param interval defines period of time when notification can come
+ * @return {@link Disposable} handler to close configuration listener at the end
+ */
+ public Disposable startListen(Duration interval) {
+
+ log.info("Start listening for configuration ...");
+ log.info(String.format("Configuration will be fetched in %s period.", interval));
+
+ // Polling properties
+ final Duration initialDelay = Duration.ofSeconds(5);
+ final Duration period = interval;
+
+ final CbsRequest request = createCbsRequest();
+ final CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationProvider.get();
+
+ return createCbsClient(cbsClientConfiguration)
+ .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period))
+ .subscribe(
+ this::handleConfiguration,
+ this::handleError
+ );
+ }
+
+ /**
+ * Create CBS Client configuration
+ * @param accept cbsClientConfiguration
+ * @return return cbs client
+ */
+ Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
+ return CbsClientFactory.createCbsClient(cbsClientConfiguration);
+ }
+
+ void handleConfiguration(JsonObject jsonObject) {
+ log.info("Configuration update {}", jsonObject);
+ if(jsonObject.has(CONFIG_DICT)) {
+ JsonObject config = jsonObject.getAsJsonObject(CONFIG_DICT);
+ JSONObject jObject = new JSONObject(config.toString());
+ configUpdater.updateConfig(Option.of(jObject));
+ } else {
+ throw new IllegalArgumentException(String.format("Invalid application configuration: %s ", jsonObject));
+ }
+ }
+
+ private void handleError(Throwable throwable) {
+ log.error("Unexpected error occurred during fetching configuration", throwable);
+ }
+
+ private CbsRequest createCbsRequest() {
+ RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ return CbsRequests.getAll(diagnosticContext);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/Conversions.java b/src/main/java/org/onap/dcae/configuration/Conversions.java
new file mode 100755
index 0000000..98f632c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/Conversions.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration;
+
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+
+import io.vavr.API;
+import io.vavr.collection.List;
+import io.vavr.control.Try;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+import org.json.JSONObject;
+
+/**
+ * Conversions is used to convert to JSON Object.
+ */
+interface Conversions {
+
+ static Try<JSONObject> toJson(String strBody) {
+ return API.Try(() -> new JSONObject(strBody))
+ .mapFailure(enhanceError("Value '%s' is not a valid JSON document", strBody));
+ }
+
+ static <T> List<T> toList(Iterator<T> iterator) {
+ return List
+ .ofAll(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java
new file mode 100755
index 0000000..b808004
--- /dev/null
+++ b/src/main/java/org/onap/dcae/configuration/cbs/CbsClientConfigurationProvider.java
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.configuration.cbs;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CbsClientConfigurationProvider is used to provide production or dev configuration for CBS client.
+ */
+public class CbsClientConfigurationProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationProvider.class);
+
+ private static final String DEFAULT_PROTOCOL = "http";
+ private static final String DEFAULT_HOSTNAME = "config-binding-service";
+ private static final int DEFAULT_PORT = 10000;
+ private static final String DEFAULT_APP_NAME = "dcae-ves-collector";
+ private static final String DEV_MODE_PROPERTY = "devMode";
+ private static final String CBS_PORT_PROPERTY = "cbsPort";
+
+ /**
+ * Returns configuration for CBS client.
+ * @return Production or dev configuration for CBS client, depends on application run arguments.
+ */
+ public CbsClientConfiguration get() {
+ try {
+ if (isDevModeEnabled()) {
+ return getDevConfiguration();
+ } else {
+ return CbsClientConfiguration.fromEnvironment();
+ }
+ } catch (Exception e) {
+ LOGGER.warn(String.format("Failed resolving CBS client configuration from system environments: %s", e));
+ }
+ return getFallbackConfiguration();
+ }
+
+ @NotNull
+ private ImmutableCbsClientConfiguration getDevConfiguration() {
+ return createCbsClientConfiguration(
+ DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME,
+ Integer.parseInt(System.getProperty(CBS_PORT_PROPERTY, String.valueOf(DEFAULT_PORT)))
+ );
+ }
+
+ private boolean isDevModeEnabled() {
+ return System.getProperty(DEV_MODE_PROPERTY) != null;
+ }
+
+ private ImmutableCbsClientConfiguration getFallbackConfiguration() {
+ LOGGER.info("Falling back to use default CBS client configuration");
+ return createCbsClientConfiguration(DEFAULT_PROTOCOL, DEFAULT_HOSTNAME, DEFAULT_APP_NAME, DEFAULT_PORT);
+ }
+
+ private ImmutableCbsClientConfiguration createCbsClientConfiguration(String protocol, String hostname,
+ String appName, Integer port) {
+ return ImmutableCbsClientConfiguration.builder()
+ .protocol(protocol)
+ .hostname(hostname)
+ .port(port)
+ .appName(appName)
+ .build();
+ }
+}
diff --git a/src/main/java/org/onap/dcae/restapi/ApiException.java b/src/main/java/org/onap/dcae/restapi/ApiException.java
index d8c21b1..426ff87 100644
--- a/src/main/java/org/onap/dcae/restapi/ApiException.java
+++ b/src/main/java/org/onap/dcae/restapi/ApiException.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.restconfcollector
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * Copyright (C) 2018-2022 Huawei. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,21 +23,38 @@ package org.onap.dcae.restapi;
import com.google.common.base.CaseFormat;
import org.json.JSONObject;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @author Pawel Szalapski (pawel.szalapski@nokia.com)
*/
public enum ApiException {
- UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401);
+
+ UNAUTHORIZED_USER(ExceptionType.POLICY_EXCEPTION, "POL2000", "Unauthorized user", 401),
+ NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404),
+ REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408),
+ TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429),
+ INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500),
+ BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502),
+ SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503),
+ GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504);
public final int httpStatusCode;
private final ExceptionType type;
private final String code;
private final String details;
+ private final List<String> variables;
ApiException(ExceptionType type, String code, String details, int httpStatusCode) {
+ this(type, code, details, new ArrayList<>(), httpStatusCode);
+ }
+
+ ApiException(ExceptionType type, String code, String details, List<String> variables, int httpStatusCode) {
this.type = type;
this.code = code;
this.details = details;
+ this.variables = variables;
this.httpStatusCode = httpStatusCode;
}
@@ -45,6 +62,9 @@ public enum ApiException {
JSONObject exceptionTypeNode = new JSONObject();
exceptionTypeNode.put("messageId", code);
exceptionTypeNode.put("text", details);
+ if(!variables.isEmpty()) {
+ exceptionTypeNode.put("variables", variables);
+ }
JSONObject requestErrorNode = new JSONObject();
requestErrorNode.put(type.toString(), exceptionTypeNode);
@@ -55,7 +75,7 @@ public enum ApiException {
}
public enum ExceptionType {
- POLICY_EXCEPTION;
+ SERVICE_EXCEPTION, POLICY_EXCEPTION;
@Override
public String toString() {
diff --git a/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java
new file mode 100755
index 0000000..fca296b
--- /dev/null
+++ b/src/test/java/org/onap/dcae/ApplicationConfigurationListenerTest.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2022 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.dcae.configuration.ConfigurationHandler;
+
+import java.time.Duration;
+
+import static org.mockito.ArgumentMatchers.any;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ApplicationConfigurationListenerTest {
+
+ @Mock
+ private ConfigurationHandler configurationHandler;
+
+ @InjectMocks
+ @Spy
+ private ApplicationConfigurationListener applicationConfigurationListener;
+
+ @Test
+ public void shouldStopJobAndCloseConnectionWhenErrorOccurredDuringListenAtConfigChange() {
+
+ // given
+ Mockito.doThrow(new RuntimeException("Simulate exception")).when(configurationHandler).startListen(any());
+
+ // when
+ applicationConfigurationListener.run();
+
+ // then
+ Mockito.verify(applicationConfigurationListener).stopListeningForConfigurationUpdates(any());
+ }
+
+ @Test
+ public void shouldSendReloadAction() {
+
+ // when
+ applicationConfigurationListener.reload(Duration.ofMillis(1));
+
+ // then
+ Mockito.verify(applicationConfigurationListener).sendReloadAction();
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
deleted file mode 100644
index 79ac03b..0000000
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.restconfcollector
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * Copyright (C) 2018-2019 Huawei. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common.publishing;
-
-import static io.vavr.API.Option;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import java.io.IOException;
-import org.json.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-public class DMaaPEventPublisherTest {
-
- private static final String STREAM_ID = "sampleStreamId";
-
- private DMaaPEventPublisher eventPublisher;
- private CambriaBatchingPublisher cambriaPublisher;
- private DMaaPPublishersCache dmaapPublishersCache;
-
- /**
- * Setup before test.
- */
- @Before
- public void setUp() {
- cambriaPublisher = mock(CambriaBatchingPublisher.class);
- dmaapPublishersCache = mock(DMaaPPublishersCache.class);
- when(dmaapPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
- eventPublisher = new DMaaPEventPublisher(dmaapPublishersCache, mock(Logger.class));
- }
-
- @Test
- public void shouldSendEventToTopic() throws Exception {
- // given
- JSONObject event = new JSONObject("{}");
-
- // when
- eventPublisher.sendEvent(event, STREAM_ID);
-
- // then
- verify(cambriaPublisher).send("MyPartitionKey", event.toString());
- }
-
-
-
- @Test
- public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
- // given
- JSONObject event = new JSONObject("{}");
- given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
-
- // when
- eventPublisher.sendEvent(event, STREAM_ID);
-
- // then
- verify(dmaapPublishersCache).closePublisherFor(STREAM_ID);
- }
-} \ No newline at end of file
diff --git a/version.properties b/version.properties
index 7d6815b..fee4928 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=1
minor=3
-patch=0
+patch=1
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT