aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
authorYongchaoWu <yongchao.wu@est.tech>2019-07-12 09:01:01 +0000
committerYongchaoWu <yongchao.wu@est.tech>2019-07-12 09:01:01 +0000
commitb96580af0070cbe6783445e79d808b2a5c8deaf2 (patch)
treece5fc8f5af33558c0544e03b7527af2bf9e24385 /datafile-app-server/src
parent415aa5b18540b099be556203a35c3f6518eaf7b7 (diff)
Cbs Client integration
Cbs Client is integrated to read configurations from consul Issue-ID: DCAEGEN2-1595 Change-Id: Idb0ebd34eba077f9c1cb584abab4d8722b56f6c5 Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java98
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java80
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java1
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java122
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java1
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java1
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java92
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java62
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java1
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java3
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java1
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test.json62
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json96
24 files changed, 366 insertions, 331 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e9d84640..6e9f7702 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -31,21 +32,26 @@ import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -65,22 +71,15 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
private ConsumerConfiguration dmaapConsumerConfiguration;
- Map<String, PublisherConfiguration> publishingConfigurations;
+ private Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
- private CloudConfigurationProvider cloudConfigurationProvider;
@Value("#{systemEnvironment}")
Properties systemEnvironment;
- Disposable refreshConfigTask = null;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- @Autowired
- public synchronized void setCloudConfigurationProvider(
- CloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
@@ -93,13 +92,25 @@ public class AppConfig {
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
loadConfigurationFromFile();
- refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
- .flatMap(count -> createRefreshConfigurationTask(count, context))
+ refreshConfigTask = createRefreshTask(context) //
.subscribe(e -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
+ Flux<AppConfig> createRefreshTask(Map<String, String> context) {
+ return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient)
+ .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig)
+ .onErrorResume(this::onErrorResume);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
/**
* Stops the refreshing of the configuration.
*/
@@ -152,55 +163,31 @@ public class AppConfig {
return ftpesConfiguration;
}
- Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
- return Flux.just(counter) //
- .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
- .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
- .flatMap(this::fetchConfiguration);
- }
-
- Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
- .onErrorResume(AppConfig::onErrorResume);
- }
-
- private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
logger.error("Could not refresh application configuration {}", trowable.toString());
return Mono.empty();
}
- private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
- Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
- .onErrorResume(AppConfig::onErrorResume);
-
- // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
- // other ones does not work
- EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
- .consulHost(env.consulHost()) //
- .consulPort(env.consulPort()) //
- .cbsName(env.cbsName()) //
- .appName(env.appName() + ":dmaap") //
- .build(); //
- Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
- .onErrorResume(t -> Mono.just(new JsonObject()));
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+ }
- return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
- .onErrorResume(AppConfig::onErrorResume);
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
}
/**
* Parse configuration.
*
- * @param serviceConfigRootObject the DFC service's configuration
- * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the
- * serviceConfigRootObject
+ * @param jsonObject the DFC service's configuration
* @return this which is updated if successful
*/
- private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ private AppConfig parseCloudConfig(JsonObject jsonObject) {
try {
- CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ CloudConfigParser parser = new CloudConfigParser(jsonObject);
setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
parser.getFtpesConfig());
+
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
@@ -220,20 +207,21 @@ public class AppConfig {
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
- parseCloudConfig(rootObject, rootObject);
+ parseCloudConfig(rootObject);
} catch (JsonSyntaxException | IOException e) {
logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
- if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
- logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfigurations, ftpesConfig);
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
} else {
this.dmaapConsumerConfiguration = consumerConfiguration;
- this.publishingConfigurations = publisherConfigurations;
+ this.publishingConfigurations = publisherConfiguration;
this.ftpesConfiguration = ftpesConfig;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 0242bef7..d9a9b76a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,14 +18,15 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -40,13 +41,13 @@ public class CloudConfigParser {
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+ private static final String CONFIG = "config";
+
+ private final JsonObject jsonObject;
- private final JsonObject serviceConfigurationRoot;
- private final JsonObject dmaapConfigurationRoot;
+ public CloudConfigParser(JsonObject jsonObject) {
+ this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
- public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
- this.serviceConfigurationRoot = serviceConfigurationRoot;
- this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
/**
@@ -57,33 +58,34 @@ public class CloudConfigParser {
* @throws DatafileTaskException if a member of the configuration is missing.
*/
public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
- Iterator<JsonElement> producerCfgs =
- toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+ JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
+ Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
Map<String, PublisherConfiguration> result = new HashMap<>();
- while (producerCfgs.hasNext()) {
- JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
- String feedName = getAsString(producerCfg, "feedName");
- JsonObject feedConfig = getFeedConfig(feedName);
+ while (changeIdentifierList.hasNext()) {
+
+ String changeIdentifier = changeIdentifierList.next();
+ JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
+ JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
.publishUrl(getAsString(feedConfig, "publish_url")) //
.passWord(getAsString(feedConfig, "password")) //
.userName(getAsString(feedConfig, "username")) //
- .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
- .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
- .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
- .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
- .enableDmaapCertAuth(
- get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(changeIdentifier) //
.logUrl(getAsString(feedConfig, "log_url")) //
.build();
result.put(cfg.changeIdentifier(), cfg);
}
return result;
+
}
/**
@@ -93,21 +95,21 @@ public class CloudConfigParser {
* @throws DatafileTaskException if a member of the configuration is missing.
*/
public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
- JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
if (topics.size() != 1) {
- throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
}
JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
String topicUrl = getAsString(dmaapInfo, "topic_url");
return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
- .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
- .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
- .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
- .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
- .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
@@ -119,10 +121,10 @@ public class CloudConfigParser {
*/
public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
- .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
- .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
- .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
+ .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
@@ -138,20 +140,8 @@ public class CloudConfigParser {
return get(obj, memberName).getAsString();
}
- private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
- JsonElement elem = dmaapConfigurationRoot.get(feedName);
- if (elem == null) {
- elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under serviceConfigurationRoot
- }
- return elem.getAsJsonObject();
+ private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsJsonObject();
}
- private static JsonArray toArray(JsonElement obj) {
- if (obj.isJsonArray()) {
- return obj.getAsJsonArray();
- }
- JsonArray arr = new JsonArray();
- arr.add(obj);
- return arr;
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
index e62a11e0..4db7963d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -62,6 +63,7 @@ public abstract class ConsumerConfiguration {
DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .endpointUrl(topicUrl()) //
.dmaapContentType("application/json") //
.dmaapPortNumber(url.getPort()) //
.dmaapHostName(url.getHost()) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index f3c915b2..ad5f648d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -21,8 +21,8 @@ import java.util.Optional;
import java.util.Properties;
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index 7a845246..d7451bdb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -60,6 +61,7 @@ public interface PublisherConfiguration {
String urlPath = url.getPath();
return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .endpointUrl(publishUrl()) //
.dmaapContentType("application/octet-stream") //
.dmaapPortNumber(url.getPort()) //
.dmaapHostName(url.getHost()) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index bdedba4e..da8361ff 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
+
import java.nio.file.Path;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index abed645a..eed0f0bd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -95,8 +95,9 @@ public class JsonMessageParser {
* @param rawMessage the Json message to parse.
* @return a <code>Flux</code> containing messages.
*/
- public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
+
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
+ return rawMessage.flatMapMany(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -126,10 +127,6 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private static Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 081c7f39..9c33484d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,16 +20,19 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -41,19 +44,20 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
-
+ private final AppConfig datafileAppConfig;
private final JsonMessageParser jsonMessageParser;
- private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final ConsumerReactiveHttpClientFactory httpClientFactory;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
- this.jsonMessageParser = new JsonMessageParser();
- this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ this(datafileAppConfig, new JsonMessageParser(),
+ new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
- protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- JsonMessageParser messageParser) {
- this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.jsonMessageParser = messageParser;
+ protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
+ ConsumerReactiveHttpClientFactory httpClientFactory) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.jsonMessageParser = jsonMessageParser;
+ this.httpClientFactory = httpClientFactory;
}
/**
@@ -63,19 +67,23 @@ public class DMaaPMessageConsumer {
*/
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
- return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
+ try {
+ DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient();
+ return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())));
+ } catch (DatafileTaskException e) {
+ logger.warn("Unable to get response from message router", e);
+ return Flux.empty();
+ }
}
- private Flux<FileReadyMessage> consume(Mono<String> message) {
+ private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
logger.trace("consume called with arg {}", message);
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
- throws DatafileTaskException {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
- WebClient client = new DmaapWebClient().fromConfiguration(config).build();
- return new DMaaPConsumerReactiveHttpClient(config, client);
+ public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
+
+ return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index bdec7199..cfaf1753 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,11 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
+
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 1ce64e41..bccbb5fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -21,6 +21,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 26353e38..de45da31 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -156,7 +158,7 @@ public class ScheduledTasks {
return this.counters;
}
- protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
+ protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index d9ca7871..f661dd0e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -19,20 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import com.google.gson.JsonElement;
@@ -40,29 +37,28 @@ import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import reactor.core.Disposable;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -73,12 +69,14 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
-class AppConfigTest {
+public class AppConfigTest {
- private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- private static final DmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
new ImmutableDmaapConsumerConfiguration.Builder() //
+ .endpointUrl(
+ "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
.timeoutMs(-1) //
.dmaapHostName("message-router.onap.svc.cluster.local") //
.dmaapUserName("admin") //
@@ -97,7 +95,7 @@ class AppConfigTest {
.enableDmaapCertAuth(true) //
.build();
- private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+ public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
.topicUrl(
"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
.trustStorePath("trustStorePath") //
@@ -120,7 +118,7 @@ class AppConfigTest {
.passWord("password") //
.build();
- private static final FtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
new ImmutableFtpesConfig.Builder() //
.keyCert("/config/dfc.jks") //
.keyPassword("secret") //
@@ -128,9 +126,9 @@ class AppConfigTest {
.trustedCaPassword("secret") //
.build();
- private static final DmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapTopicName("/publish/1") //
+ .endpointUrl("https://message-router.onap.svc.cluster.local:3907/publish/1").dmaapTopicName("/publish/1") //
.dmaapUserPassword("password") //
.dmaapPortNumber(3907) //
.dmaapProtocol("https") //
@@ -154,14 +152,14 @@ class AppConfigTest {
}
private AppConfig appConfigUnderTest;
- private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ CbsClient cbsClient = mock(CbsClient.class);
@BeforeEach
- public void setUp() {
+ void setUp() {
appConfigUnderTest = spy(AppConfig.class);
- appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
appConfigUnderTest.systemEnvironment = new Properties();
+
}
@Test
@@ -212,19 +210,14 @@ class AppConfigTest {
// Given
appConfigUnderTest.setFilepath("/temp.json");
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
-
// When
appConfigUnderTest.loadConfigurationFromFile();
// Then
- assertTrue("Error message missing in log.",
- logAppender.list.toString().contains("[WARN] Local configuration file not loaded: /temp.json"));
- logAppender.stop();
-
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
.hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@@ -264,32 +257,31 @@ class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
-
- Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
.create(task) //
.expectSubscription() //
- .expectNextCount(0) //
- .verifyComplete();
+ .verifyComplete(); //
assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
}
@Test
public void whenPeriodicConfigRefreshNoConsul() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+ EnvProperties props = properties();
+ doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
- doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
- Mono<JsonObject> err = Mono.error(new IOException());
- doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ Flux<JsonObject> err = Flux.error(new IOException());
+ doReturn(err).when(cbsClient).updates(any(), any(), any());
- ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
- Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
.create(task) //
.expectSubscription() //
- .expectNextCount(0) //
.verifyComplete();
assertTrue(
@@ -298,13 +290,14 @@ class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
- doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
-
- Mono<JsonObject> json = Mono.just(getJsonRootObject());
+ EnvProperties props = properties();
+ doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
- doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+ Flux<JsonObject> json = Flux.just(getJsonRootObject());
+ doReturn(json).when(cbsClient).updates(any(), any(), any());
- Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
.create(task) //
@@ -317,14 +310,18 @@ class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
- doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+ EnvProperties props = properties();
+ doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
- Mono<JsonObject> json = Mono.just(getJsonRootObject());
- Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the dmaap plugin
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
- doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+ Flux<JsonObject> json = Flux.just(getJsonRootObject());
+ Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
+ // dmaap plugin
- Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+ doReturn(json, err).when(cbsClient).updates(any(), any(), any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
StepVerifier //
.create(task) //
@@ -335,37 +332,6 @@ class AppConfigTest {
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
}
- @Test
- public void whenStopSuccess() {
- Disposable disposableMock = mock(Disposable.class);
- appConfigUnderTest.refreshConfigTask = disposableMock;
-
- appConfigUnderTest.stop();
-
- verify(disposableMock).dispose();
- verifyNoMoreInteractions(disposableMock);
- assertNull(appConfigUnderTest.refreshConfigTask);
- }
-
- @Test
- public void whenNoPublisherConfigurationThrowException() throws DatafileTaskException {
- appConfigUnderTest.publishingConfigurations = new HashMap<>();
-
- DatafileTaskException exception = assertThrows(DatafileTaskException.class,
- () -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
- assertEquals("Cannot find getPublishingConfiguration for changeIdentifier: " + CHANGE_IDENTIFIER,
- exception.getMessage());
- }
-
- @Test
- public void whenFeedIsConfiguredReturnTrue() {
- HashMap<String, PublisherConfiguration> publishingConfigs = new HashMap<>();
- publishingConfigs.put(CHANGE_IDENTIFIER, null);
- appConfigUnderTest.publishingConfigurations = publishingConfigs;
-
- assertTrue(appConfigUnderTest.isFeedConfigured(CHANGE_IDENTIFIER));
- }
-
private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
return rootObject;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
index b630bd09..558eaf0e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
index 9b8197f9..55c796ab 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index 5330a7f3..1c58650d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -33,9 +33,11 @@ import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
+
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Optional;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
index 1e54d29d..499b2608 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
@@ -25,8 +25,10 @@ import static org.mockito.Mockito.when;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import java.net.URI;
import java.net.URISyntaxException;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 2e3245a4..cd18bfa2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -70,6 +70,7 @@ class JsonMessageParserTest {
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES";
private static final String CHANGE_TYPE = "FileReady";
private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady";
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@@ -116,15 +117,14 @@ class JsonMessageParserTest {
.files(files) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNext(expectedMessage).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -173,10 +173,11 @@ class JsonMessageParserTest {
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement1 = new JsonParser().parse(messageString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1)))
.expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
}
@@ -196,7 +197,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -204,8 +204,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue(logAppender.list.toString()
.contains("[ERROR] VES event parsing. File information wrong. " + "Missing location."));
@@ -229,7 +229,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -237,8 +236,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
logAppender.list.toString()
@@ -293,9 +292,10 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
+ JsonElement jsonElement = new JsonParser().parse(messageString);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNext(expectedMessage).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -314,7 +314,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -322,8 +321,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectComplete().verify();
assertTrue("Error missing in log",
logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
@@ -346,7 +345,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -354,8 +352,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
logAppender.list.toString()
@@ -373,7 +371,6 @@ class JsonMessageParserTest {
.notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -381,8 +378,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
@@ -405,7 +402,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -413,8 +409,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
logAppender.list.toString()
@@ -439,7 +435,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -447,8 +442,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
logAppender.list.toString()
@@ -506,15 +501,14 @@ class JsonMessageParserTest {
.files(files) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNext(expectedMessage).verifyComplete();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNext(expectedMessage).verifyComplete();
}
@Test
@@ -523,7 +517,6 @@ class JsonMessageParserTest {
.eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
.build();
- String incorrectMessageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -531,8 +524,8 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
- .expectSubscription().expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectComplete().verify();
assertTrue("Error missing in log",
logAppender.list.toString()
@@ -550,7 +543,7 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -573,7 +566,6 @@ class JsonMessageParserTest {
.addAdditionalField(additionalField) //
.build();
- String messageString = message.toString();
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -581,12 +573,38 @@ class JsonMessageParserTest {
.getJsonObjectFromAnArray(jsonElement);
ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
- .expectSubscription().expectNextCount(0).expectComplete().verify();
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectNextCount(0).expectComplete().verify();
assertTrue("Error missing in log",
logAppender.list.toString()
.contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING + " Change type is wrong: "
+ INCORRECT_CHANGE_TYPE + " Expected: FileReady Message: " + message.getParsed()));
}
+
+ @Test
+ void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name(PM_FILE_NAME) //
+ .location(LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+ .changeType(CHANGE_TYPE) //
+ .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+ .addAdditionalField(additionalField) //
+ .build();
+
+ String parsedString = message.getParsed();
+ JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+ JsonElement jsonElement = new JsonParser().parse(parsedString);
+ Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+ .getJsonObjectFromAnArray(jsonElement);
+
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ .expectComplete().verify();
+ }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index 1bea290f..a4319d37 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -20,21 +20,30 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -48,6 +57,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
@@ -76,25 +86,36 @@ public class DMaaPMessageConsumerTest {
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
- private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+ private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private DMaaPConsumerReactiveHttpClient httpClientMock;
private DMaaPMessageConsumer messageConsumer;
private static String ftpesMessageString;
+ private static JsonElement ftpesMessageJson;
private static FileData ftpesFileData;
private static FileReadyMessage expectedFtpesMessage;
private static String sftpMessageString;
+ private static JsonElement sftpMessageJson;
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ private static AppConfig appConfig;
+ private static ConsumerConfiguration dmaapConsumerConfiguration;
+
/**
* Sets up data for the test.
*/
@BeforeAll
public static void setUp() {
+
+ appConfig = mock(AppConfig.class);
+ dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
+
+ JsonParser jsonParser = new JsonParser();
+
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
.compression(GZIP_COMPRESSION) //
@@ -111,6 +132,8 @@ public class DMaaPMessageConsumerTest {
.build();
ftpesMessageString = ftpesJsonMessage.toString();
+ ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+
MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
.productName(PRODUCT_NAME) //
.vendorName(VENDOR_NAME) //
@@ -151,6 +174,7 @@ public class DMaaPMessageConsumerTest {
.addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
+ sftpMessageJson = jsonParser.parse(sftpMessageString);
sftpFileData = ImmutableFileData.builder() //
.name(PM_FILE_NAME) //
.location(SFTP_LOCATION) //
@@ -188,54 +212,62 @@ public class DMaaPMessageConsumerTest {
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
- prepareMocksForDmaapConsumer("", null);
+ prepareMocksForDmaapConsumer(Optional.empty(), null);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectSubscription() //
.expectError(DatafileTaskException.class) //
.verify();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
+ prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedFtpesMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
- prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
+ prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedSftpMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
verifyNoMoreInteractions(httpClientMock);
}
- private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
- Mono<String> messageAsMono = Mono.just(message);
+ private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
+ FileReadyMessage fileReadyMessageAfterConsume) {
+ Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
- when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+ when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
+ try {
+ doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
+ } catch (DatafileTaskException e) {
+ e.printStackTrace();
+ }
- if (!message.isEmpty()) {
- when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
- .thenReturn(Flux.just(fileReadyMessageAfterConsume));
+ if (message.isPresent()) {
+ when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
} else {
- when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+ when(jsonMessageParserMock.getMessagesFromJson(any()))
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumer = spy(new DMaaPMessageConsumer(httpClientMock, jsonMessageParserMock));
+ messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
}
+
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index ddc279c2..fb369174 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import java.io.File;
import java.net.URI;
import java.nio.file.Path;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index 93f20077..1ab97d4a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -32,6 +32,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 5a8d962f..a0096b77 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
@@ -44,6 +45,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -65,6 +67,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
import org.slf4j.MDC;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
index 68f3582f..cfcb7bf9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
@@ -24,6 +24,7 @@ import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+
import org.slf4j.LoggerFactory;
public class LoggingUtils {
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
index 4d4d00ab..0157c7d2 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints_test.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
@@ -1,31 +1,35 @@
{
- "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
- "dmaap.ftpesConfig.keyPassword":"secret",
- "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
- "dmaap.ftpesConfig.trustedCaPassword":"secret",
- "dmaap.security.trustStorePath":"trustStorePath",
- "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
- "dmaap.security.keyStorePath":"keyStorePath",
- "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
- "dmaap.security.enableDmaapCertAuth":"true",
- "dmaap.dmaapProducerConfiguration": {
- "changeIdentifier":"PM_MEAS_FILES",
- "feedName":"feed00"
- },
- "streams_subscribes":{
- "dmaap_subscriber":{
- "dmmap_info":{
- "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
- },
- "type":"message_router"
+ "config": {
+ "//description": "This file is only used for testing purposes",
+ "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword": "secret",
+ "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword": "secret",
+ "dmaap.security.trustStorePath": "trustStorePath",
+ "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
+ "dmaap.security.keyStorePath": "keyStorePath",
+ "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth": "true",
+ "streams_publishes": {
+ "PM_MEAS_FILES": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "user",
+ "log_url": "https://dmaap.example.com/feedlog/972",
+ "publish_url": "https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location": "loc00",
+ "password": "password",
+ "publisher_id": "972.360gm"
+ }
}
- },
- "feed00":{
- "username":"user",
- "log_url":"https://dmaap.example.com/feedlog/972",
- "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
- "location":"loc00",
- "password":"password",
- "publisher_id":"972.360gm"
- }
-}
+ },
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type": "message_router"
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
index a7e2497d..61b324ce 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
@@ -1,49 +1,57 @@
{
- "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
- "dmaap.ftpesConfig.keyPassword":"secret",
- "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
- "dmaap.ftpesConfig.trustedCaPassword":"secret",
- "dmaap.security.trustStorePath":"trustStorePath",
- "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
- "dmaap.security.keyStorePath":"keyStorePath",
- "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
- "dmaap.security.enableDmaapCertAuth":"true",
- "dmaap.dmaapProducerConfiguration":[
- {
- "changeIdentifier":"PM_MEAS_FILES",
- "feedName":"feed00"
+ "config": {
+ "//description": "This file is only used for testing purposes",
+ "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword": "secret",
+ "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword": "secret",
+ "dmaap.security.trustStorePath": "trustStorePath",
+ "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
+ "dmaap.security.keyStorePath": "keyStorePath",
+ "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth": "true",
+ "streams_publishes": {
+ "PM_MEAS_FILES": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "CYE9fl40",
+ "location": "loc00",
+ "log_url": "https://dmaap-dr-prov/feedlog/4",
+ "publisher_id": "4.307dw",
+ "password": "izBJD8nLjawq0HMG",
+ "publish_url": "https://dmaap-dr-prov/publish/4"
+ }
},
- {
- "changeIdentifier":"XX_FILES",
- "feedName":"feed01"
+ "XX_FILES": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "user",
+ "log_url": "feed01::log_url",
+ "publish_url": "feed01::publish_url",
+ "location": "loc00",
+ "password": "",
+ "publisher_id": ""
+ }
},
- {
- "changeIdentifier":"YY_FILES",
- "feedName":"feed01"
+ "YY_FILES": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "user",
+ "log_url": "feed01::log_url",
+ "publish_url": "feed01::publish_url",
+ "location": "loc00",
+ "password": "",
+ "publisher_id": ""
+ }
}
- ],
- "streams_subscribes":{
- "dmaap_subscriber":{
- "dmmap_info":{
- "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
- },
- "type":"message_router"
+ },
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type": "message_router"
}
- },
- "feed00":{
- "username":"user",
- "log_url":"https://dmaap.example.com/feedlog/972",
- "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
- "location":"loc00",
- "password":"password",
- "publisher_id":"972.360gm"
- },
- "feed01":{
- "username":"user",
- "log_url":"feed01::log_url",
- "publish_url":"feed01::publish_url",
- "location":"loc00",
- "password":"",
- "publisher_id":""
- }
-}
+ }
+ }
+} \ No newline at end of file