diff options
author | YongchaoWu <yongchao.wu@est.tech> | 2019-07-12 09:01:01 +0000 |
---|---|---|
committer | YongchaoWu <yongchao.wu@est.tech> | 2019-07-12 09:01:01 +0000 |
commit | b96580af0070cbe6783445e79d808b2a5c8deaf2 (patch) | |
tree | ce5fc8f5af33558c0544e03b7527af2bf9e24385 /datafile-app-server/src | |
parent | 415aa5b18540b099be556203a35c3f6518eaf7b7 (diff) |
Cbs Client integration
Cbs Client is integrated to read configurations from consul
Issue-ID: DCAEGEN2-1595
Change-Id: Idb0ebd34eba077f9c1cb584abab4d8722b56f6c5
Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
Diffstat (limited to 'datafile-app-server/src')
24 files changed, 366 insertions, 331 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e9d84640..6e9f7702 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -31,21 +32,26 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,22 +71,15 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); private ConsumerConfiguration dmaapConsumerConfiguration; - Map<String, PublisherConfiguration> publishingConfigurations; + private Map<String, PublisherConfiguration> publishingConfigurations; private FtpesConfig ftpesConfiguration; - private CloudConfigurationProvider cloudConfigurationProvider; @Value("#{systemEnvironment}") Properties systemEnvironment; - Disposable refreshConfigTask = null; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - @Autowired - public synchronized void setCloudConfigurationProvider( - CloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - public synchronized void setFilepath(String filepath) { this.filepath = filepath; } @@ -93,13 +92,25 @@ public class AppConfig { Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); loadConfigurationFromFile(); - refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) - .flatMap(count -> createRefreshConfigurationTask(count, context)) + refreshConfigTask = createRefreshTask(context) // .subscribe(e -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } + Flux<AppConfig> createRefreshTask(Map<String, String> context) { + return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient) + .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig) + .onErrorResume(this::onErrorResume); + } + + private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) { + final Duration initialDelay = Duration.ZERO; + final Duration refreshPeriod = Duration.ofMinutes(1); + final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); + return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); + } + /** * Stops the refreshing of the configuration. */ @@ -152,55 +163,31 @@ public class AppConfig { return ftpesConfiguration; } - Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) { - return Flux.just(counter) // - .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // - .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // - .flatMap(this::fetchConfiguration); - } - - Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) - .onErrorResume(AppConfig::onErrorResume); - } - - private static <R> Mono<R> onErrorResume(Throwable trowable) { + private <R> Mono<R> onErrorResume(Throwable trowable) { logger.error("Could not refresh application configuration {}", trowable.toString()); return Mono.empty(); } - private Mono<AppConfig> fetchConfiguration(EnvProperties env) { - Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // - .onErrorResume(AppConfig::onErrorResume); - - // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the - // other ones does not work - EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // - .consulHost(env.consulHost()) // - .consulPort(env.consulPort()) // - .cbsName(env.cbsName()) // - .appName(env.appName() + ":dmaap") // - .build(); // - Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) - .onErrorResume(t -> Mono.just(new JsonObject())); + Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context); + } - return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // - .onErrorResume(AppConfig::onErrorResume); + Mono<CbsClient> createCbsClient(EnvProperties env) { + return CbsClientFactory.createCbsClient(env); } /** * Parse configuration. * - * @param serviceConfigRootObject the DFC service's configuration - * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the - * serviceConfigRootObject + * @param jsonObject the DFC service's configuration * @return this which is updated if successful */ - private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + private AppConfig parseCloudConfig(JsonObject jsonObject) { try { - CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + CloudConfigParser parser = new CloudConfigParser(jsonObject); setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig()); + } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -220,20 +207,21 @@ public class AppConfig { if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } - parseCloudConfig(rootObject, rootObject); + parseCloudConfig(rootObject); } catch (JsonSyntaxException | IOException e) { logger.warn("Local configuration file not loaded: {}", filepath, e); } } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) { - if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) { - logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfigurations, ftpesConfig); + Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; - this.publishingConfigurations = publisherConfigurations; + this.publishingConfigurations = publisherConfiguration; this.ftpesConfiguration = ftpesConfig; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 0242bef7..d9a9b76a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -18,14 +18,15 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -40,13 +41,13 @@ public class CloudConfigParser { private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; + private static final String CONFIG = "config"; + + private final JsonObject jsonObject; - private final JsonObject serviceConfigurationRoot; - private final JsonObject dmaapConfigurationRoot; + public CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject.getAsJsonObject(CONFIG); - public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) { - this.serviceConfigurationRoot = serviceConfigurationRoot; - this.dmaapConfigurationRoot = dmaapConfigurationRoot; } /** @@ -57,33 +58,34 @@ public class CloudConfigParser { * @throws DatafileTaskException if a member of the configuration is missing. */ public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException { - Iterator<JsonElement> producerCfgs = - toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject(); + Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator(); Map<String, PublisherConfiguration> result = new HashMap<>(); - while (producerCfgs.hasNext()) { - JsonObject producerCfg = producerCfgs.next().getAsJsonObject(); - String feedName = getAsString(producerCfg, "feedName"); - JsonObject feedConfig = getFeedConfig(feedName); + while (changeIdentifierList.hasNext()) { + + String changeIdentifier = changeIdentifierList.next(); + JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier); + JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject(); PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // .passWord(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // - .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // - .enableDmaapCertAuth( - get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(changeIdentifier) // .logUrl(getAsString(feedConfig, "log_url")) // .build(); result.put(cfg.changeIdentifier(), cfg); } return result; + } /** @@ -93,21 +95,21 @@ public class CloudConfigParser { * @throws DatafileTaskException if a member of the configuration is missing. */ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { - JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); + JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject(); Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); if (topics.size() != 1) { - throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics); + throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics); } JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); String topicUrl = getAsString(dmaapInfo, "topic_url"); return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } @@ -119,10 +121,10 @@ public class CloudConfigParser { */ public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) - .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) - .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) - .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // + .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPassword")) // .build(); } @@ -138,20 +140,8 @@ public class CloudConfigParser { return get(obj, memberName).getAsString(); } - private JsonObject getFeedConfig(String feedName) throws DatafileTaskException { - JsonElement elem = dmaapConfigurationRoot.get(feedName); - if (elem == null) { - elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under serviceConfigurationRoot - } - return elem.getAsJsonObject(); + private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException { + return get(obj, memberName).getAsJsonObject(); } - private static JsonArray toArray(JsonElement obj) { - if (obj.isJsonArray()) { - return obj.getAsJsonArray(); - } - JsonArray arr = new JsonArray(); - arr.add(obj); - return arr; - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index e62a11e0..4db7963d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -62,6 +63,7 @@ public abstract class ConsumerConfiguration { DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); return new ImmutableDmaapConsumerConfiguration.Builder() // + .endpointUrl(topicUrl()) // .dmaapContentType("application/json") // .dmaapPortNumber(url.getPort()) // .dmaapHostName(url.getHost()) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index f3c915b2..ad5f648d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -21,8 +21,8 @@ import java.util.Optional; import java.util.Properties; import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index 7a845246..d7451bdb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -60,6 +61,7 @@ public interface PublisherConfiguration { String urlPath = url.getPath(); return new ImmutableDmaapPublisherConfiguration.Builder() // + .endpointUrl(publishUrl()) // .dmaapContentType("application/octet-stream") // .dmaapPortNumber(url.getPort()) // .dmaapHostName(url.getHost()) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index bdedba4e..da8361ff 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index abed645a..eed0f0bd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -95,8 +95,9 @@ public class JsonMessageParser { * @param rawMessage the Json message to parse. * @return a <code>Flux</code> containing messages. */ - public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { - return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData); + + public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) { + return rawMessage.flatMapMany(this::createMessageData); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -126,10 +127,6 @@ public class JsonMessageParser { : getMessagesFromJsonArray(jsonElement); } - private static Mono<JsonElement> getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 081c7f39..9c33484d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -20,16 +20,19 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,19 +44,20 @@ import reactor.core.publisher.Mono; */ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); - + private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final ConsumerReactiveHttpClientFactory httpClientFactory; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { - this.jsonMessageParser = new JsonMessageParser(); - this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); + public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + this(datafileAppConfig, new JsonMessageParser(), + new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); } - protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - JsonMessageParser messageParser) { - this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.jsonMessageParser = messageParser; + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, + ConsumerReactiveHttpClientFactory httpClientFactory) { + this.datafileAppConfig = datafileAppConfig; + this.jsonMessageParser = jsonMessageParser; + this.httpClientFactory = httpClientFactory; } /** @@ -63,19 +67,23 @@ public class DMaaPMessageConsumer { */ public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); - return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + try { + DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient(); + return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))); + } catch (DatafileTaskException e) { + logger.warn("Unable to get response from message router", e); + return Flux.empty(); + } } - private Flux<FileReadyMessage> consume(Mono<String> message) { + private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { logger.trace("consume called with arg {}", message); return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) - throws DatafileTaskException { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); - WebClient client = new DmaapWebClient().fromConfiguration(config).build(); - return new DMaaPConsumerReactiveHttpClient(config, client); + public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { + + return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index bdec7199..cfaf1753 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,11 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.File; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; + import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 1ce64e41..bccbb5fc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -21,6 +21,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 26353e38..de45da31 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.Counters; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -156,7 +158,7 @@ public class ScheduledTasks { return this.counters; } - protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java index d9ca7871..f661dd0e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java @@ -19,20 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import com.google.common.base.Charsets; import com.google.common.io.Resources; import com.google.gson.JsonElement; @@ -40,29 +37,28 @@ import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Map; import java.util.Properties; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; -import reactor.core.Disposable; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -73,12 +69,14 @@ import reactor.test.StepVerifier; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ -class AppConfigTest { +public class AppConfigTest { - private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - private static final DmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // + public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // new ImmutableDmaapConsumerConfiguration.Builder() // + .endpointUrl( + "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") .timeoutMs(-1) // .dmaapHostName("message-router.onap.svc.cluster.local") // .dmaapUserName("admin") // @@ -97,7 +95,7 @@ class AppConfigTest { .enableDmaapCertAuth(true) // .build(); - private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() // + public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() // .topicUrl( "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") .trustStorePath("trustStorePath") // @@ -120,7 +118,7 @@ class AppConfigTest { .passWord("password") // .build(); - private static final FtpesConfig CORRECT_FTPES_CONFIGURATION = // + private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // new ImmutableFtpesConfig.Builder() // .keyCert("/config/dfc.jks") // .keyPassword("secret") // @@ -128,9 +126,9 @@ class AppConfigTest { .trustedCaPassword("secret") // .build(); - private static final DmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // + private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // new ImmutableDmaapPublisherConfiguration.Builder() // - .dmaapTopicName("/publish/1") // + .endpointUrl("https://message-router.onap.svc.cluster.local:3907/publish/1").dmaapTopicName("/publish/1") // .dmaapUserPassword("password") // .dmaapPortNumber(3907) // .dmaapProtocol("https") // @@ -154,14 +152,14 @@ class AppConfigTest { } private AppConfig appConfigUnderTest; - private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class); private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); + CbsClient cbsClient = mock(CbsClient.class); @BeforeEach - public void setUp() { + void setUp() { appConfigUnderTest = spy(AppConfig.class); - appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider); appConfigUnderTest.systemEnvironment = new Properties(); + } @Test @@ -212,19 +210,14 @@ class AppConfigTest { // Given appConfigUnderTest.setFilepath("/temp.json"); - ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); - // When appConfigUnderTest.loadConfigurationFromFile(); // Then - assertTrue("Error message missing in log.", - logAppender.list.toString().contains("[WARN] Local configuration file not loaded: /temp.json")); - logAppender.stop(); - Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)) .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES"); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } @@ -264,32 +257,31 @@ class AppConfigTest { @Test public void whenPeriodicConfigRefreshNoEnvironmentVariables() { ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); - - Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context); StepVerifier // .create(task) // .expectSubscription() // - .expectNextCount(0) // - .verifyComplete(); + .verifyComplete(); // assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")); } @Test public void whenPeriodicConfigRefreshNoConsul() { + ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); + EnvProperties props = properties(); + doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); - doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); - Mono<JsonObject> err = Mono.error(new IOException()); - doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); + Flux<JsonObject> err = Flux.error(new IOException()); + doReturn(err).when(cbsClient).updates(any(), any(), any()); - ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class); - Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context); StepVerifier // .create(task) // .expectSubscription() // - .expectNextCount(0) // .verifyComplete(); assertTrue( @@ -298,13 +290,14 @@ class AppConfigTest { @Test public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException { - doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); - - Mono<JsonObject> json = Mono.just(getJsonRootObject()); + EnvProperties props = properties(); + doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); - doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + Flux<JsonObject> json = Flux.just(getJsonRootObject()); + doReturn(json).when(cbsClient).updates(any(), any(), any()); - Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context); StepVerifier // .create(task) // @@ -317,14 +310,18 @@ class AppConfigTest { @Test public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException { - doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any()); + EnvProperties props = properties(); + doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); - Mono<JsonObject> json = Mono.just(getJsonRootObject()); - Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the dmaap plugin + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); - doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any()); + Flux<JsonObject> json = Flux.just(getJsonRootObject()); + Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the + // dmaap plugin - Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context); + doReturn(json, err).when(cbsClient).updates(any(), any(), any()); + + Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context); StepVerifier // .create(task) // @@ -335,37 +332,6 @@ class AppConfigTest { Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); } - @Test - public void whenStopSuccess() { - Disposable disposableMock = mock(Disposable.class); - appConfigUnderTest.refreshConfigTask = disposableMock; - - appConfigUnderTest.stop(); - - verify(disposableMock).dispose(); - verifyNoMoreInteractions(disposableMock); - assertNull(appConfigUnderTest.refreshConfigTask); - } - - @Test - public void whenNoPublisherConfigurationThrowException() throws DatafileTaskException { - appConfigUnderTest.publishingConfigurations = new HashMap<>(); - - DatafileTaskException exception = assertThrows(DatafileTaskException.class, - () -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER)); - assertEquals("Cannot find getPublishingConfiguration for changeIdentifier: " + CHANGE_IDENTIFIER, - exception.getMessage()); - } - - @Test - public void whenFeedIsConfiguredReturnTrue() { - HashMap<String, PublisherConfiguration> publishingConfigs = new HashMap<>(); - publishingConfigs.put(CHANGE_IDENTIFIER, null); - appConfigUnderTest.publishingConfigurations = publishingConfigs; - - assertTrue(appConfigUnderTest.isFeedConfigured(CHANGE_IDENTIFIER)); - } - private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException { JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject(); return rootObject; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java index b630bd09..558eaf0e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java index 9b8197f9..55c796ab 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index 5330a7f3..1c58650d 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -33,9 +33,11 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; + import java.io.IOException; import java.nio.file.Paths; import java.util.Optional; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java index 1e54d29d..499b2608 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java @@ -25,8 +25,10 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import java.net.URI; import java.net.URISyntaxException; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 2e3245a4..cd18bfa2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -70,6 +70,7 @@ class JsonMessageParserTest { private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES"; private static final String CHANGE_TYPE = "FileReady"; private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady"; private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @@ -116,15 +117,14 @@ class JsonMessageParserTest { .files(files) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNext(expectedMessage).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNext(expectedMessage).verifyComplete(); } @Test @@ -173,10 +173,11 @@ class JsonMessageParserTest { String messageString = "[" + parsedString + "," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement1 = new JsonParser().parse(messageString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1))) .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); } @@ -196,7 +197,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -204,8 +204,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue(logAppender.list.toString() .contains("[ERROR] VES event parsing. File information wrong. " + "Missing location.")); @@ -229,7 +229,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -237,8 +236,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", logAppender.list.toString() @@ -293,9 +292,10 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); + JsonElement jsonElement = new JsonParser().parse(messageString); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNext(expectedMessage).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNext(expectedMessage).verifyComplete(); } @Test @@ -314,7 +314,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -322,8 +321,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectComplete().verify(); assertTrue("Error missing in log", logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING @@ -346,7 +345,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -354,8 +352,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", logAppender.list.toString() @@ -373,7 +371,6 @@ class JsonMessageParserTest { .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -381,8 +378,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING @@ -405,7 +402,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -413,8 +409,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", logAppender.list.toString() @@ -439,7 +435,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -447,8 +442,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", logAppender.list.toString() @@ -506,15 +501,14 @@ class JsonMessageParserTest { .files(files) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNext(expectedMessage).verifyComplete(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNext(expectedMessage).verifyComplete(); } @Test @@ -523,7 +517,6 @@ class JsonMessageParserTest { .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // .build(); - String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -531,8 +524,8 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString))) - .expectSubscription().expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectComplete().verify(); assertTrue("Error missing in log", logAppender.list.toString() @@ -550,7 +543,7 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() .expectComplete().verify(); assertTrue("Error missing in log", @@ -573,7 +566,6 @@ class JsonMessageParserTest { .addAdditionalField(additionalField) // .build(); - String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); JsonElement jsonElement = new JsonParser().parse(parsedString); @@ -581,12 +573,38 @@ class JsonMessageParserTest { .getJsonObjectFromAnArray(jsonElement); ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString))) - .expectSubscription().expectNextCount(0).expectComplete().verify(); + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectNextCount(0).expectComplete().verify(); assertTrue("Error missing in log", logAppender.list.toString() .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING + " Change type is wrong: " + INCORRECT_CHANGE_TYPE + " Expected: FileReady Message: " + message.getParsed())); } + + @Test + void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() { + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // + .build(); + + String parsedString = message.getParsed(); + JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); + JsonElement jsonElement = new JsonParser().parse(parsedString); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) + .getJsonObjectFromAnArray(jsonElement); + + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + .expectComplete().verify(); + } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index 1bea290f..a4319d37 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -20,21 +20,30 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; @@ -48,6 +57,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; import reactor.core.publisher.Flux; @@ -76,25 +86,36 @@ public class DMaaPMessageConsumerTest { private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; - private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>(); + private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>(); private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private DMaaPConsumerReactiveHttpClient httpClientMock; private DMaaPMessageConsumer messageConsumer; private static String ftpesMessageString; + private static JsonElement ftpesMessageJson; private static FileData ftpesFileData; private static FileReadyMessage expectedFtpesMessage; private static String sftpMessageString; + private static JsonElement sftpMessageJson; private static FileData sftpFileData; private static FileReadyMessage expectedSftpMessage; + private static AppConfig appConfig; + private static ConsumerConfiguration dmaapConsumerConfiguration; + /** * Sets up data for the test. */ @BeforeAll public static void setUp() { + + appConfig = mock(AppConfig.class); + dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG; + + JsonParser jsonParser = new JsonParser(); + AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() // .location(FTPES_LOCATION) // .compression(GZIP_COMPRESSION) // @@ -111,6 +132,8 @@ public class DMaaPMessageConsumerTest { .build(); ftpesMessageString = ftpesJsonMessage.toString(); + ftpesMessageJson = jsonParser.parse(ftpesMessageString); + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // .productName(PRODUCT_NAME) // .vendorName(VENDOR_NAME) // @@ -151,6 +174,7 @@ public class DMaaPMessageConsumerTest { .addAdditionalField(sftpAdditionalField) // .build(); sftpMessageString = sftpJsonMessage.toString(); + sftpMessageJson = jsonParser.parse(sftpMessageString); sftpFileData = ImmutableFileData.builder() // .name(PM_FILE_NAME) // .location(SFTP_LOCATION) // @@ -188,54 +212,62 @@ public class DMaaPMessageConsumerTest { @Test public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { - prepareMocksForDmaapConsumer("", null); + prepareMocksForDmaapConsumer(Optional.empty(), null); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectSubscription() // .expectError(DatafileTaskException.class) // .verify(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); + prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedFtpesMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); verifyNoMoreInteractions(httpClientMock); } @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); + prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedSftpMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); verifyNoMoreInteractions(httpClientMock); } - private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { - Mono<String> messageAsMono = Mono.just(message); + private void prepareMocksForDmaapConsumer(Optional<JsonElement> message, + FileReadyMessage fileReadyMessageAfterConsume) { + Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty(); JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class); - when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono); + when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono); + when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); + ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); + try { + doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap()); + } catch (DatafileTaskException e) { + e.printStackTrace(); + } - if (!message.isEmpty()) { - when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) - .thenReturn(Flux.just(fileReadyMessageAfterConsume)); + if (message.isPresent()) { + when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume)); } else { - when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) + when(jsonMessageParserMock.getMessagesFromJson(any())) .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumer = spy(new DMaaPMessageConsumer(httpClientMock, jsonMessageParserMock)); + messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory)); } + } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index ddc279c2..fb369174 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import java.io.File; import java.net.URI; import java.nio.file.Path; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 93f20077..1ab97d4a 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -32,6 +32,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 5a8d962f..a0096b77 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; @@ -44,6 +45,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; + import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,6 +67,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; import org.slf4j.MDC; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java index 68f3582f..cfcb7bf9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java @@ -24,6 +24,7 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; + import org.slf4j.LoggerFactory; public class LoggingUtils { diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json index 4d4d00ab..0157c7d2 100644 --- a/datafile-app-server/src/test/resources/datafile_endpoints_test.json +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json @@ -1,31 +1,35 @@ { - "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", - "dmaap.ftpesConfig.keyPassword":"secret", - "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", - "dmaap.ftpesConfig.trustedCaPassword":"secret", - "dmaap.security.trustStorePath":"trustStorePath", - "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", - "dmaap.security.keyStorePath":"keyStorePath", - "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", - "dmaap.security.enableDmaapCertAuth":"true", - "dmaap.dmaapProducerConfiguration": { - "changeIdentifier":"PM_MEAS_FILES", - "feedName":"feed00" - }, - "streams_subscribes":{ - "dmaap_subscriber":{ - "dmmap_info":{ - "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" - }, - "type":"message_router" + "config": { + "//description": "This file is only used for testing purposes", + "dmaap.ftpesConfig.keyCert": "/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword": "secret", + "dmaap.ftpesConfig.trustedCa": "config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword": "secret", + "dmaap.security.trustStorePath": "trustStorePath", + "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath", + "dmaap.security.keyStorePath": "keyStorePath", + "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth": "true", + "streams_publishes": { + "PM_MEAS_FILES": { + "type": "data_router", + "dmaap_info": { + "username": "user", + "log_url": "https://dmaap.example.com/feedlog/972", + "publish_url": "https://message-router.onap.svc.cluster.local:3907/publish/1", + "location": "loc00", + "password": "password", + "publisher_id": "972.360gm" + } } - }, - "feed00":{ - "username":"user", - "log_url":"https://dmaap.example.com/feedlog/972", - "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", - "location":"loc00", - "password":"password", - "publisher_id":"972.360gm" - } -} + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type": "message_router" + } + } + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json index a7e2497d..61b324ce 100644 --- a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json @@ -1,49 +1,57 @@ { - "dmaap.ftpesConfig.keyCert":"/config/dfc.jks", - "dmaap.ftpesConfig.keyPassword":"secret", - "dmaap.ftpesConfig.trustedCa":"config/ftp.jks", - "dmaap.ftpesConfig.trustedCaPassword":"secret", - "dmaap.security.trustStorePath":"trustStorePath", - "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath", - "dmaap.security.keyStorePath":"keyStorePath", - "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath", - "dmaap.security.enableDmaapCertAuth":"true", - "dmaap.dmaapProducerConfiguration":[ - { - "changeIdentifier":"PM_MEAS_FILES", - "feedName":"feed00" + "config": { + "//description": "This file is only used for testing purposes", + "dmaap.ftpesConfig.keyCert": "/config/dfc.jks", + "dmaap.ftpesConfig.keyPassword": "secret", + "dmaap.ftpesConfig.trustedCa": "config/ftp.jks", + "dmaap.ftpesConfig.trustedCaPassword": "secret", + "dmaap.security.trustStorePath": "trustStorePath", + "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath", + "dmaap.security.keyStorePath": "keyStorePath", + "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath", + "dmaap.security.enableDmaapCertAuth": "true", + "streams_publishes": { + "PM_MEAS_FILES": { + "type": "data_router", + "dmaap_info": { + "username": "CYE9fl40", + "location": "loc00", + "log_url": "https://dmaap-dr-prov/feedlog/4", + "publisher_id": "4.307dw", + "password": "izBJD8nLjawq0HMG", + "publish_url": "https://dmaap-dr-prov/publish/4" + } }, - { - "changeIdentifier":"XX_FILES", - "feedName":"feed01" + "XX_FILES": { + "type": "data_router", + "dmaap_info": { + "username": "user", + "log_url": "feed01::log_url", + "publish_url": "feed01::publish_url", + "location": "loc00", + "password": "", + "publisher_id": "" + } }, - { - "changeIdentifier":"YY_FILES", - "feedName":"feed01" + "YY_FILES": { + "type": "data_router", + "dmaap_info": { + "username": "user", + "log_url": "feed01::log_url", + "publish_url": "feed01::publish_url", + "location": "loc00", + "password": "", + "publisher_id": "" + } } - ], - "streams_subscribes":{ - "dmaap_subscriber":{ - "dmmap_info":{ - "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" - }, - "type":"message_router" + }, + "streams_subscribes": { + "dmaap_subscriber": { + "dmaap_info": { + "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + }, + "type": "message_router" } - }, - "feed00":{ - "username":"user", - "log_url":"https://dmaap.example.com/feedlog/972", - "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1", - "location":"loc00", - "password":"password", - "publisher_id":"972.360gm" - }, - "feed01":{ - "username":"user", - "log_url":"feed01::log_url", - "publish_url":"feed01::publish_url", - "location":"loc00", - "password":"", - "publisher_id":"" - } -} + } + } +}
\ No newline at end of file |