From b96580af0070cbe6783445e79d808b2a5c8deaf2 Mon Sep 17 00:00:00 2001 From: YongchaoWu Date: Fri, 12 Jul 2019 09:01:01 +0000 Subject: Cbs Client integration Cbs Client is integrated to read configurations from consul Issue-ID: DCAEGEN2-1595 Change-Id: Idb0ebd34eba077f9c1cb584abab4d8722b56f6c5 Signed-off-by: YongchaoWu --- .../datafile/configuration/AppConfig.java | 98 ++++++++++------------ .../datafile/configuration/CloudConfigParser.java | 80 ++++++++---------- .../configuration/ConsumerConfiguration.java | 2 + .../configuration/EnvironmentProcessor.java | 4 +- .../configuration/PublisherConfiguration.java | 2 + .../collectors/datafile/ftp/SftpClient.java | 2 + .../datafile/service/JsonMessageParser.java | 9 +- .../datafile/tasks/DMaaPMessageConsumer.java | 46 +++++----- .../datafile/tasks/DataRouterPublisher.java | 2 + .../collectors/datafile/tasks/FileCollector.java | 1 + .../collectors/datafile/tasks/ScheduledTasks.java | 4 +- 11 files changed, 122 insertions(+), 128 deletions(-) (limited to 'datafile-app-server/src/main/java') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index e9d84640..6e9f7702 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.gson.TypeAdapterFactory; + import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -31,21 +32,26 @@ import java.time.Duration; import java.util.Map; import java.util.Properties; import java.util.ServiceLoader; + import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,22 +71,15 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); private ConsumerConfiguration dmaapConsumerConfiguration; - Map publishingConfigurations; + private Map publishingConfigurations; private FtpesConfig ftpesConfiguration; - private CloudConfigurationProvider cloudConfigurationProvider; @Value("#{systemEnvironment}") Properties systemEnvironment; - Disposable refreshConfigTask = null; + private Disposable refreshConfigTask = null; @NotEmpty private String filepath; - @Autowired - public synchronized void setCloudConfigurationProvider( - CloudConfigurationProvider reactiveCloudConfigurationProvider) { - this.cloudConfigurationProvider = reactiveCloudConfigurationProvider; - } - public synchronized void setFilepath(String filepath) { this.filepath = filepath; } @@ -93,13 +92,25 @@ public class AppConfig { Map context = MappedDiagnosticContext.initializeTraceContext(); loadConfigurationFromFile(); - refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5)) - .flatMap(count -> createRefreshConfigurationTask(count, context)) + refreshConfigTask = createRefreshTask(context) // .subscribe(e -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } + Flux createRefreshTask(Map context) { + return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient) + .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig) + .onErrorResume(this::onErrorResume); + } + + private Flux periodicConfigurationUpdates(CbsClient cbsClient) { + final Duration initialDelay = Duration.ZERO; + final Duration refreshPeriod = Duration.ofMinutes(1); + final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create()); + return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod); + } + /** * Stops the refreshing of the configuration. */ @@ -152,55 +163,31 @@ public class AppConfig { return ftpesConfiguration; } - Flux createRefreshConfigurationTask(Long counter, Map context) { - return Flux.just(counter) // - .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) // - .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) // - .flatMap(this::fetchConfiguration); - } - - Mono readEnvironmentVariables(Properties systemEnvironment, Map context) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) - .onErrorResume(AppConfig::onErrorResume); - } - - private static Mono onErrorResume(Throwable trowable) { + private Mono onErrorResume(Throwable trowable) { logger.error("Could not refresh application configuration {}", trowable.toString()); return Mono.empty(); } - private Mono fetchConfiguration(EnvProperties env) { - Mono serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) // - .onErrorResume(AppConfig::onErrorResume); - - // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the - // other ones does not work - EnvProperties dmaapEnv = ImmutableEnvProperties.builder() // - .consulHost(env.consulHost()) // - .consulPort(env.consulPort()) // - .cbsName(env.cbsName()) // - .appName(env.appName() + ":dmaap") // - .build(); // - Mono dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv) - .onErrorResume(t -> Mono.just(new JsonObject())); + Mono getEnvironment(Properties systemEnvironment, Map context) { + return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context); + } - return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) // - .onErrorResume(AppConfig::onErrorResume); + Mono createCbsClient(EnvProperties env) { + return CbsClientFactory.createCbsClient(env); } /** * Parse configuration. * - * @param serviceConfigRootObject the DFC service's configuration - * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the - * serviceConfigRootObject + * @param jsonObject the DFC service's configuration * @return this which is updated if successful */ - private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) { + private AppConfig parseCloudConfig(JsonObject jsonObject) { try { - CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject); + CloudConfigParser parser = new CloudConfigParser(jsonObject); setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig()); + } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); } @@ -220,20 +207,21 @@ public class AppConfig { if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } - parseCloudConfig(rootObject, rootObject); + parseCloudConfig(rootObject); } catch (JsonSyntaxException | IOException e) { logger.warn("Local configuration file not loaded: {}", filepath, e); } } private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration, - Map publisherConfigurations, FtpesConfig ftpesConfig) { - if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) { - logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}", - consumerConfiguration, publisherConfigurations, ftpesConfig); + Map publisherConfiguration, FtpesConfig ftpesConfig) { + if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) { + logger.error( + "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}", + consumerConfiguration, publisherConfiguration, ftpesConfig); } else { this.dmaapConsumerConfiguration = consumerConfiguration; - this.publishingConfigurations = publisherConfigurations; + this.publishingConfigurations = publisherConfiguration; this.ftpesConfiguration = ftpesConfig; } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index 0242bef7..d9a9b76a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -18,14 +18,15 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; /** @@ -40,13 +41,13 @@ public class CloudConfigParser { private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; + private static final String CONFIG = "config"; + + private final JsonObject jsonObject; - private final JsonObject serviceConfigurationRoot; - private final JsonObject dmaapConfigurationRoot; + public CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject.getAsJsonObject(CONFIG); - public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) { - this.serviceConfigurationRoot = serviceConfigurationRoot; - this.dmaapConfigurationRoot = dmaapConfigurationRoot; } /** @@ -57,33 +58,34 @@ public class CloudConfigParser { * @throws DatafileTaskException if a member of the configuration is missing. */ public Map getDmaapPublisherConfigurations() throws DatafileTaskException { - Iterator producerCfgs = - toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator(); + JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject(); + Iterator changeIdentifierList = producerCfgs.keySet().iterator(); Map result = new HashMap<>(); - while (producerCfgs.hasNext()) { - JsonObject producerCfg = producerCfgs.next().getAsJsonObject(); - String feedName = getAsString(producerCfg, "feedName"); - JsonObject feedConfig = getFeedConfig(feedName); + while (changeIdentifierList.hasNext()) { + + String changeIdentifier = changeIdentifierList.next(); + JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier); + JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject(); PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // .passWord(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) // - .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) // - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // - .enableDmaapCertAuth( - get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // - .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) // + .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) // + .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // + .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) // + .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) // + .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .changeIdentifier(changeIdentifier) // .logUrl(getAsString(feedConfig, "log_url")) // .build(); result.put(cfg.changeIdentifier(), cfg); } return result; + } /** @@ -93,21 +95,21 @@ public class CloudConfigParser { * @throws DatafileTaskException if a member of the configuration is missing. */ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { - JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject(); + JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject(); Set> topics = consumerCfg.entrySet(); if (topics.size() != 1) { - throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics); + throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics); } JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject(); + JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); String topicUrl = getAsString(dmaapInfo, "topic_url"); return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) + .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) + .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) + .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) + .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .build(); } @@ -119,10 +121,10 @@ public class CloudConfigParser { */ public FtpesConfig getFtpesConfig() throws DatafileTaskException { return new ImmutableFtpesConfig.Builder() // - .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert")) - .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword")) - .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa")) - .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) // + .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert")) + .keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword")) + .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa")) + .trustedCaPassword(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPassword")) // .build(); } @@ -138,20 +140,8 @@ public class CloudConfigParser { return get(obj, memberName).getAsString(); } - private JsonObject getFeedConfig(String feedName) throws DatafileTaskException { - JsonElement elem = dmaapConfigurationRoot.get(feedName); - if (elem == null) { - elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under serviceConfigurationRoot - } - return elem.getAsJsonObject(); + private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException { + return get(obj, memberName).getAsJsonObject(); } - private static JsonArray toArray(JsonElement obj) { - if (obj.isJsonArray()) { - return obj.getAsJsonArray(); - } - JsonArray arr = new JsonArray(); - arr.add(obj); - return arr; - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index e62a11e0..4db7963d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -62,6 +63,7 @@ public abstract class ConsumerConfiguration { DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); return new ImmutableDmaapConsumerConfiguration.Builder() // + .endpointUrl(topicUrl()) // .dmaapContentType("application/json") // .dmaapPortNumber(url.getPort()) // .dmaapHostName(url.getHost()) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java index f3c915b2..ad5f648d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -21,8 +21,8 @@ import java.util.Optional; import java.util.Properties; import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index 7a845246..d7451bdb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import java.net.MalformedURLException; import java.net.URL; + import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; @@ -60,6 +61,7 @@ public interface PublisherConfiguration { String urlPath = url.getPath(); return new ImmutableDmaapPublisherConfiguration.Builder() // + .endpointUrl(publishUrl()) // .dmaapContentType("application/octet-stream") // .dmaapPortNumber(url.getPort()) // .dmaapHostName(url.getHost()) // diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java index bdedba4e..da8361ff 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java @@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; + import java.nio.file.Path; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.slf4j.Logger; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index abed645a..eed0f0bd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -95,8 +95,9 @@ public class JsonMessageParser { * @param rawMessage the Json message to parse. * @return a Flux containing messages. */ - public Flux getMessagesFromJson(Mono rawMessage) { - return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData); + + public Flux getMessagesFromJson(Mono rawMessage) { + return rawMessage.flatMapMany(this::createMessageData); } Optional getJsonObjectFromAnArray(JsonElement element) { @@ -126,10 +127,6 @@ public class JsonMessageParser { : getMessagesFromJsonArray(jsonElement); } - private static Mono getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message)); - } - private static Flux createMessages(Flux jsonObject) { return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 081c7f39..9c33484d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -20,16 +20,19 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,19 +44,20 @@ import reactor.core.publisher.Mono; */ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); - + private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final ConsumerReactiveHttpClientFactory httpClientFactory; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { - this.jsonMessageParser = new JsonMessageParser(); - this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); + public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + this(datafileAppConfig, new JsonMessageParser(), + new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); } - protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - JsonMessageParser messageParser) { - this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.jsonMessageParser = messageParser; + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, + ConsumerReactiveHttpClientFactory httpClientFactory) { + this.datafileAppConfig = datafileAppConfig; + this.jsonMessageParser = jsonMessageParser; + this.httpClientFactory = httpClientFactory; } /** @@ -63,19 +67,23 @@ public class DMaaPMessageConsumer { */ public Flux getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); - return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + try { + DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient(); + return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))); + } catch (DatafileTaskException e) { + logger.warn("Unable to get response from message router", e); + return Flux.empty(); + } } - private Flux consume(Mono message) { + private Flux consume(Mono message) { logger.trace("consume called with arg {}", message); return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) - throws DatafileTaskException { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); - WebClient client = new DmaapWebClient().fromConfiguration(config).build(); - return new DMaaPConsumerReactiveHttpClient(config, client); + public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { + + return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index bdec7199..cfaf1753 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -22,11 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; import com.google.gson.JsonParser; + import java.io.File; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; + import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index 1ce64e41..bccbb5fc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -21,6 +21,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 26353e38..de45da31 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.Counters; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -156,7 +158,7 @@ public class ScheduledTasks { return this.counters; } - protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException { + protected DMaaPMessageConsumer createConsumerTask() { return new DMaaPMessageConsumer(this.applicationConfiguration); } -- cgit 1.2.3-korg