summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
authorYongchaoWu <yongchao.wu@est.tech>2019-07-12 09:01:01 +0000
committerYongchaoWu <yongchao.wu@est.tech>2019-07-12 09:01:01 +0000
commitb96580af0070cbe6783445e79d808b2a5c8deaf2 (patch)
treece5fc8f5af33558c0544e03b7527af2bf9e24385 /datafile-app-server/src/main
parent415aa5b18540b099be556203a35c3f6518eaf7b7 (diff)
Cbs Client integration
Cbs Client is integrated to read configurations from consul Issue-ID: DCAEGEN2-1595 Change-Id: Idb0ebd34eba077f9c1cb584abab4d8722b56f6c5 Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java98
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java80
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java4
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java9
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java46
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java2
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java1
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java4
11 files changed, 122 insertions, 128 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e9d84640..6e9f7702 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -31,21 +32,26 @@ import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -65,22 +71,15 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
private ConsumerConfiguration dmaapConsumerConfiguration;
- Map<String, PublisherConfiguration> publishingConfigurations;
+ private Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
- private CloudConfigurationProvider cloudConfigurationProvider;
@Value("#{systemEnvironment}")
Properties systemEnvironment;
- Disposable refreshConfigTask = null;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- @Autowired
- public synchronized void setCloudConfigurationProvider(
- CloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
@@ -93,13 +92,25 @@ public class AppConfig {
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
loadConfigurationFromFile();
- refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
- .flatMap(count -> createRefreshConfigurationTask(count, context))
+ refreshConfigTask = createRefreshTask(context) //
.subscribe(e -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
+ Flux<AppConfig> createRefreshTask(Map<String, String> context) {
+ return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient)
+ .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig)
+ .onErrorResume(this::onErrorResume);
+ }
+
+ private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+ final Duration initialDelay = Duration.ZERO;
+ final Duration refreshPeriod = Duration.ofMinutes(1);
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+ }
+
/**
* Stops the refreshing of the configuration.
*/
@@ -152,55 +163,31 @@ public class AppConfig {
return ftpesConfiguration;
}
- Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
- return Flux.just(counter) //
- .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
- .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
- .flatMap(this::fetchConfiguration);
- }
-
- Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
- .onErrorResume(AppConfig::onErrorResume);
- }
-
- private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ private <R> Mono<R> onErrorResume(Throwable trowable) {
logger.error("Could not refresh application configuration {}", trowable.toString());
return Mono.empty();
}
- private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
- Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
- .onErrorResume(AppConfig::onErrorResume);
-
- // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
- // other ones does not work
- EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
- .consulHost(env.consulHost()) //
- .consulPort(env.consulPort()) //
- .cbsName(env.cbsName()) //
- .appName(env.appName() + ":dmaap") //
- .build(); //
- Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
- .onErrorResume(t -> Mono.just(new JsonObject()));
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+ }
- return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
- .onErrorResume(AppConfig::onErrorResume);
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env);
}
/**
* Parse configuration.
*
- * @param serviceConfigRootObject the DFC service's configuration
- * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the
- * serviceConfigRootObject
+ * @param jsonObject the DFC service's configuration
* @return this which is updated if successful
*/
- private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ private AppConfig parseCloudConfig(JsonObject jsonObject) {
try {
- CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ CloudConfigParser parser = new CloudConfigParser(jsonObject);
setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
parser.getFtpesConfig());
+
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
@@ -220,20 +207,21 @@ public class AppConfig {
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
- parseCloudConfig(rootObject, rootObject);
+ parseCloudConfig(rootObject);
} catch (JsonSyntaxException | IOException e) {
logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
- Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
- if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
- logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
- consumerConfiguration, publisherConfigurations, ftpesConfig);
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
} else {
this.dmaapConsumerConfiguration = consumerConfiguration;
- this.publishingConfigurations = publisherConfigurations;
+ this.publishingConfigurations = publisherConfiguration;
this.ftpesConfiguration = ftpesConfig;
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index 0242bef7..d9a9b76a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -18,14 +18,15 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
@@ -40,13 +41,13 @@ public class CloudConfigParser {
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+ private static final String CONFIG = "config";
+
+ private final JsonObject jsonObject;
- private final JsonObject serviceConfigurationRoot;
- private final JsonObject dmaapConfigurationRoot;
+ public CloudConfigParser(JsonObject jsonObject) {
+ this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
- public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
- this.serviceConfigurationRoot = serviceConfigurationRoot;
- this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
/**
@@ -57,33 +58,34 @@ public class CloudConfigParser {
* @throws DatafileTaskException if a member of the configuration is missing.
*/
public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
- Iterator<JsonElement> producerCfgs =
- toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+ JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
+ Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
Map<String, PublisherConfiguration> result = new HashMap<>();
- while (producerCfgs.hasNext()) {
- JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
- String feedName = getAsString(producerCfg, "feedName");
- JsonObject feedConfig = getFeedConfig(feedName);
+ while (changeIdentifierList.hasNext()) {
+
+ String changeIdentifier = changeIdentifierList.next();
+ JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
+ JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
.publishUrl(getAsString(feedConfig, "publish_url")) //
.passWord(getAsString(feedConfig, "password")) //
.userName(getAsString(feedConfig, "username")) //
- .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
- .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
- .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
- .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
- .enableDmaapCertAuth(
- get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(changeIdentifier) //
.logUrl(getAsString(feedConfig, "log_url")) //
.build();
result.put(cfg.changeIdentifier(), cfg);
}
return result;
+
}
/**
@@ -93,21 +95,21 @@ public class CloudConfigParser {
* @throws DatafileTaskException if a member of the configuration is missing.
*/
public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
- JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
if (topics.size() != 1) {
- throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
}
JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
String topicUrl = getAsString(dmaapInfo, "topic_url");
return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
- .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
- .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
- .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
- .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
- .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
@@ -119,10 +121,10 @@ public class CloudConfigParser {
*/
public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
- .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
- .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
- .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
+ .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
@@ -138,20 +140,8 @@ public class CloudConfigParser {
return get(obj, memberName).getAsString();
}
- private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
- JsonElement elem = dmaapConfigurationRoot.get(feedName);
- if (elem == null) {
- elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under serviceConfigurationRoot
- }
- return elem.getAsJsonObject();
+ private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsJsonObject();
}
- private static JsonArray toArray(JsonElement obj) {
- if (obj.isJsonArray()) {
- return obj.getAsJsonArray();
- }
- JsonArray arr = new JsonArray();
- arr.add(obj);
- return arr;
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
index e62a11e0..4db7963d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -62,6 +63,7 @@ public abstract class ConsumerConfiguration {
DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .endpointUrl(topicUrl()) //
.dmaapContentType("application/json") //
.dmaapPortNumber(url.getPort()) //
.dmaapHostName(url.getHost()) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
index f3c915b2..ad5f648d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
@@ -21,8 +21,8 @@ import java.util.Optional;
import java.util.Properties;
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index 7a845246..d7451bdb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import java.net.MalformedURLException;
import java.net.URL;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -60,6 +61,7 @@ public interface PublisherConfiguration {
String urlPath = url.getPath();
return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .endpointUrl(publishUrl()) //
.dmaapContentType("application/octet-stream") //
.dmaapPortNumber(url.getPort()) //
.dmaapHostName(url.getHost()) //
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index bdedba4e..da8361ff 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
+
import java.nio.file.Path;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
import org.slf4j.Logger;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index abed645a..eed0f0bd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -95,8 +95,9 @@ public class JsonMessageParser {
* @param rawMessage the Json message to parse.
* @return a <code>Flux</code> containing messages.
*/
- public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
+
+ public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
+ return rawMessage.flatMapMany(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -126,10 +127,6 @@ public class JsonMessageParser {
: getMessagesFromJsonArray(jsonElement);
}
- private static Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
- }
-
private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 081c7f39..9c33484d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,16 +20,19 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -41,19 +44,20 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
-
+ private final AppConfig datafileAppConfig;
private final JsonMessageParser jsonMessageParser;
- private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final ConsumerReactiveHttpClientFactory httpClientFactory;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
- this.jsonMessageParser = new JsonMessageParser();
- this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ this(datafileAppConfig, new JsonMessageParser(),
+ new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
- protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- JsonMessageParser messageParser) {
- this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.jsonMessageParser = messageParser;
+ protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
+ ConsumerReactiveHttpClientFactory httpClientFactory) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.jsonMessageParser = jsonMessageParser;
+ this.httpClientFactory = httpClientFactory;
}
/**
@@ -63,19 +67,23 @@ public class DMaaPMessageConsumer {
*/
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
- return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
+ try {
+ DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient();
+ return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())));
+ } catch (DatafileTaskException e) {
+ logger.warn("Unable to get response from message router", e);
+ return Flux.empty();
+ }
}
- private Flux<FileReadyMessage> consume(Mono<String> message) {
+ private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
logger.trace("consume called with arg {}", message);
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
- throws DatafileTaskException {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
- WebClient client = new DmaapWebClient().fromConfiguration(config).build();
- return new DMaaPConsumerReactiveHttpClient(config, client);
+ public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
+
+ return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index bdec7199..cfaf1753 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -22,11 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
+
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 1ce64e41..bccbb5fc 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -21,6 +21,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 26353e38..de45da31 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -156,7 +158,7 @@ public class ScheduledTasks {
return this.counters;
}
- protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
+ protected DMaaPMessageConsumer createConsumerTask() {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}