aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java93
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java133
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java91
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java91
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java5
12 files changed, 172 insertions, 403 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index c257ceed..d933e337 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -41,8 +41,9 @@ import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticConte
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,12 +72,12 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
private ConsumerConfiguration dmaapConsumerConfiguration;
private Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
private SftpConfig sftpConfiguration;
- @Value("#{systemEnvironment}")
- Properties systemEnvironment;
private Disposable refreshConfigTask = null;
@NotEmpty
@@ -102,8 +103,8 @@ public class AppConfig {
}
Flux<AppConfig> createRefreshTask(Map<String, String> context) {
- return getEnvironment(systemEnvironment, context) //
- .flatMap(this::createCbsClient) //
+ return createCbsClientConfiguration()
+ .flatMap(this::createCbsClient)
.flatMapMany(this::periodicConfigurationUpdates) //
.map(this::parseCloudConfig) //
.onErrorResume(this::onErrorResume);
@@ -175,19 +176,25 @@ public class AppConfig {
return Mono.empty();
}
- Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+ Mono<CbsClientConfiguration> createCbsClientConfiguration() {
+ try {
+ return Mono.just(CbsClientConfiguration.fromEnvironment());
+ } catch (CbsClientConfigurationException e) {
+ return Mono.error(e);
+ }
}
- Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
+ Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
+ return CbsClientFactory.createCbsClient(cbsClientConfiguration);
}
private AppConfig parseCloudConfig(JsonObject configurationObject) {
try {
- CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment);
- setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
- parser.getFtpesConfig(), parser.getSftpConfig());
+ CloudConfigParser parser =
+ new CloudConfigParser(configurationObject, systemEnvironment);
+ setConfiguration(parser.getConsumerConfiguration(),
+ parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig(),
+ parser.getSftpConfig());
logConfig();
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
@@ -207,8 +214,7 @@ public class AppConfig {
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
try (InputStream inputStream = createInputStream(filepath)) {
- JsonParser parser = new JsonParser();
- JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
@@ -228,8 +234,8 @@ public class AppConfig {
this.sftpConfiguration = sftpConfig;
}
- JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
- return parser.parse(new InputStreamReader(inputStream));
+ JsonElement getJsonElement(InputStream inputStream) {
+ return JsonParser.parseReader(new InputStreamReader(inputStream));
}
InputStream createInputStream(@NotNull String filepath) throws IOException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index a86a32b8..6ace4aae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import javax.validation.constraints.NotNull;
+import io.vavr.collection.Stream;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
/**
* Parses the cloud configuration.
@@ -51,6 +67,12 @@ public class CloudConfigParser {
private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
"sftp.security.strictHostKeyChecking";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
+ private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
+ private static final int FIRST_SOURCE_INDEX = 0;
+
private final Properties systemEnvironment;
private final JsonObject jsonObject;
@@ -78,7 +100,7 @@ public class CloudConfigParser {
PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
.publishUrl(getAsString(feedConfig, "publish_url")) //
- .passWord(getAsString(feedConfig, "password")) //
+ .password(getAsString(feedConfig, "password")) //
.userName(getAsString(feedConfig, "username")) //
.trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
.trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
@@ -98,24 +120,51 @@ public class CloudConfigParser {
* Get the consumer configuration.
*
* @return the consumer configuration.
- * @throws DatafileTaskException if a member of the configuration is missing.
+ * @throws DatafileTaskException if the configuration is invalid.
*/
- public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
- JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
- Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
- if (topics.size() != 1) {
- throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
+ public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
+ try {
+ MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
+ MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
+ return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not parse message router consumer configuration", e);
}
- JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
- String topicUrl = getAsString(dmaapInfo, "topic_url");
-
- return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
- .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
- .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
- .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
- .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
- .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ }
+
+ private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
+ .build();
+ }
+
+ private SecurityKeys createSecurityKeys() throws DatafileTaskException {
+ return ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
+ .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
+ .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
+ .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
+ .build();
+ }
+
+ private boolean isDmaapCertAuthEnabled(JsonObject config) {
+ return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
+ }
+
+ private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
+ Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
+ if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
+ throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
+ }
+ RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
+ MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
+ .sourceDefinition(parsedSource)
+ .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
+ .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
.build();
}
@@ -176,4 +225,8 @@ public class CloudConfigParser {
return get(obj, memberName).getAsJsonObject();
}
+ private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
+ return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
+ }
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
index 4db7963d..89fcf1c2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -1,113 +1,56 @@
/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 NOKIA Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
-import java.net.MalformedURLException;
-import java.net.URL;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+public class ConsumerConfiguration {
-@Value.Immutable
-@Value.Style(redactedMask = "####")
-@Gson.TypeAdapters
-public abstract class ConsumerConfiguration {
- @Value.Redacted
- public abstract String topicUrl();
+ private final MessageRouterSubscriberConfig messageRouterSubscriberConfig;
+ private final MessageRouterSubscriber messageRouterSubscriber;
+ private final MessageRouterSubscribeRequest messageRouterSubscribeRequest;
- public abstract String trustStorePath();
-
- public abstract String trustStorePasswordPath();
-
- public abstract String keyStorePath();
-
- public abstract String keyStorePasswordPath();
-
- public abstract Boolean enableDmaapCertAuth();
-
- /**
- * Gets the configuration in the SDK version.
- *
- * @return a <code>DmaapConsumerConfiguration</code> representing the configuration.
- *
- * @throws DatafileTaskException if something is wrong with the topic URL.
- */
- public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
- try {
- URL url = new URL(topicUrl());
- String passwd = "";
- String userName = "";
- if (url.getUserInfo() != null) {
- String[] userInfo = url.getUserInfo().split(":");
- userName = userInfo[0];
- passwd = userInfo[1];
- }
- String urlPath = url.getPath();
- DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
-
- return new ImmutableDmaapConsumerConfiguration.Builder() //
- .endpointUrl(topicUrl()) //
- .dmaapContentType("application/json") //
- .dmaapPortNumber(url.getPort()) //
- .dmaapHostName(url.getHost()) //
- .dmaapTopicName(path.dmaapTopicName) //
- .dmaapProtocol(url.getProtocol()) //
- .dmaapUserName(userName) //
- .dmaapUserPassword(passwd) //
- .trustStorePath(this.trustStorePath()) //
- .trustStorePasswordPath(this.trustStorePasswordPath()) //
- .keyStorePath(this.keyStorePath()) //
- .keyStorePasswordPath(this.keyStorePasswordPath()) //
- .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
- .consumerId(path.consumerId) //
- .consumerGroup(path.consumerGroup) //
- .timeoutMs(-1) //
- .messageLimit(-1) //
- .build();
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Could not parse the URL", e);
- }
+ public ConsumerConfiguration(MessageRouterSubscriberConfig messageRouterSubscriberConfig,
+ MessageRouterSubscriber messageRouterSubscriber, MessageRouterSubscribeRequest messageRouterSubscribeRequest) {
+ this.messageRouterSubscriberConfig = messageRouterSubscriberConfig;
+ this.messageRouterSubscriber = messageRouterSubscriber;
+ this.messageRouterSubscribeRequest = messageRouterSubscribeRequest;
}
- private class DmaapConsumerUrlPath {
- final String dmaapTopicName;
- final String consumerGroup;
- final String consumerId;
-
- DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
- this.dmaapTopicName = dmaapTopicName;
- this.consumerGroup = consumerGroup;
- this.consumerId = consumerId;
- }
+ public MessageRouterSubscriber getMessageRouterSubscriber() {
+ return messageRouterSubscriber;
}
- private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
- String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
- if (tokens.length != 5) {
- throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
- }
+ public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ return messageRouterSubscribeRequest;
+ }
- final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
- final String consumerGroup = tokens[3]; // OpenDcae-c12
- final String consumerId = tokens[4]; // C12
- return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ public MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() {
+ return messageRouterSubscriberConfig;
}
+ @Override
+ public String toString() {
+ return "ConsumerConfiguration{" + "securityKeys=" + messageRouterSubscriberConfig.securityKeys()
+ + ", consumerGroup=" + messageRouterSubscribeRequest.consumerGroup() + ", consumerID="
+ + messageRouterSubscribeRequest.consumerId() + '}';
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
deleted file mode 100644
index ad5f648d..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import reactor.core.publisher.Mono;
-
-/**
- * Handling the Consul connection.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- */
-class EnvironmentProcessor {
-
- private static final int DEFAULT_CONSUL_PORT = 8500;
- private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
-
- private EnvironmentProcessor() {
- }
-
- static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.trace("Loading configuration from system environment variables");
- EnvProperties envProperties;
- try {
- envProperties = ImmutableEnvProperties.builder() //
- .consulHost(getConsulHost(systemEnvironment)) //
- .consulPort(getConsultPort(systemEnvironment)) //
- .cbsName(getConfigBindingService(systemEnvironment)) //
- .appName(getService(systemEnvironment)) //
- .build();
- } catch (EnvironmentLoaderException e) {
- return Mono.error(e);
- }
- logger.trace("Evaluated environment system variables {}", envProperties);
- return Mono.just(envProperties);
- }
-
- private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
- }
-
- private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
- .map(Integer::valueOf) //
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
- }
-
- private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
- }
-
- private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional
- .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
- }
-
- private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
- return DEFAULT_CONSUL_PORT;
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index d7451bdb..fa7d7841 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,13 +17,10 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import java.net.MalformedURLException;
-import java.net.URL;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
@Value.Immutable
@Value.Style(redactedMask = "####")
@@ -36,7 +34,7 @@ public interface PublisherConfiguration {
String userName();
@Value.Redacted
- String passWord();
+ String password();
String trustStorePath();
@@ -50,30 +48,4 @@ public interface PublisherConfiguration {
String changeIdentifier();
- /**
- * Get the publisher configuration in SDK format.
- *
- * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration.
- * @throws MalformedURLException if the publish URL is malformed.
- */
- default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
- URL url = new URL(publishUrl());
- String urlPath = url.getPath();
-
- return new ImmutableDmaapPublisherConfiguration.Builder() //
- .endpointUrl(publishUrl()) //
- .dmaapContentType("application/octet-stream") //
- .dmaapPortNumber(url.getPort()) //
- .dmaapHostName(url.getHost()) //
- .dmaapTopicName(urlPath) //
- .dmaapProtocol(url.getProtocol()) //
- .dmaapUserName(this.userName()) //
- .dmaapUserPassword(this.passWord()) //
- .trustStorePath(this.trustStorePath()) //
- .trustStorePasswordPath(this.trustStorePasswordPath()) //
- .keyStorePath(this.keyStorePath()) //
- .keyStorePasswordPath(this.keyStorePasswordPath()) //
- .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
- .build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
deleted file mode 100644
index 5a8806b4..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.Builder;
-import reactor.core.publisher.Mono;
-
-/**
- * Web client for the DMaaP MessageRouter.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
- */
-public class DmaapWebClient {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class);
-
- private String contentType;
-
- /**
- * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
- *
- * @param dmaapCustomConfig - configuration object
- * @return DmaapReactiveWebClient
- */
- public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
- this.contentType = dmaapCustomConfig.dmaapContentType();
- return this;
- }
-
- /**
- * Construct Reactive WebClient with appropriate settings.
- *
- * @return WebClient
- */
- public WebClient build() {
- Builder webClientBuilder = WebClient.builder() //
- .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
- .filter(getRequestFilter()) //
- .filter(getResponseFilter());
- return webClientBuilder.build();
- }
-
- private ExchangeFilterFunction getResponseFilter() {
- return ExchangeFilterFunction.ofResponseProcessor(this::logResponse);
- }
-
- Mono<ClientResponse> logResponse(ClientResponse clientResponse) {
- MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
- logger.trace("Response Status {}", clientResponse.statusCode());
- MDC.remove(RESPONSE_CODE);
- return Mono.just(clientResponse);
- }
-
- private ExchangeFilterFunction getRequestFilter() {
- return ExchangeFilterFunction.ofRequestProcessor(this::logRequest);
- }
-
- Mono<ClientRequest> logRequest(ClientRequest clientRequest) {
- MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
- logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
- logger.trace("HTTP request headers: {}", clientRequest.headers());
- MDC.remove(SERVICE_NAME);
- return Mono.just(clientRequest);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index eed0f0bd..708865fa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ==================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -96,18 +96,17 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
- public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
- return rawMessage.flatMapMany(this::createMessageData);
+ public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) {
+ return rawMessage.flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
if (element.isJsonPrimitive()) {
- return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject());
} else if (element.isJsonObject()) {
return Optional.of((JsonObject) element);
} else {
- return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject());
}
}
@@ -117,9 +116,9 @@ public class JsonMessageParser {
}
/**
- * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}.
*
- * @param rawMessage - results from DMaaP
+ * @param jsonElement - result from DMaaP
* @return reactive Flux of FileReadyMessages
*/
private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index a46e17ba..9de37e8c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,10 +37,10 @@ import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -61,14 +61,14 @@ public class DmaapProducerHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final DmaapPublisherConfiguration configuration;
+ private final PublisherConfiguration configuration;
/**
* Constructor DmaapProducerReactiveHttpClient.
*
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
- public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ public DmaapProducerHttpClient(PublisherConfiguration dmaapPublisherConfiguration) {
this.configuration = dmaapPublisherConfiguration;
}
@@ -131,7 +131,7 @@ public class DmaapProducerHttpClient {
* @param request the request to add credentials to.
*/
public void addUserCredentialsToHead(HttpUriRequest request) {
- String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+ String plainCreds = configuration.userName() + ":" + configuration.password();
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 066983ae..0780e18e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,21 +22,14 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* Component used to get messages from the MessageRouter.
@@ -46,18 +40,14 @@ public class DMaaPMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
private final AppConfig datafileAppConfig;
private final JsonMessageParser jsonMessageParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
- this(datafileAppConfig, new JsonMessageParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+ this(datafileAppConfig, new JsonMessageParser());
}
- protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) {
this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = jsonMessageParser;
- this.httpClientFactory = httpClientFactory;
}
/**
@@ -68,21 +58,20 @@ public class DMaaPMessageConsumer {
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
try {
- DMaaPConsumerReactiveHttpClient client = createHttpClient();
- return consume((client.getDMaaPConsumerResponse(Optional.empty())));
- } catch (DatafileTaskException e) {
+ ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
+ MessageRouterSubscriber messageRouterSubscriber =
+ dmaapConsumerConfiguration.getMessageRouterSubscriber();
+ Flux<JsonElement> responseElements =
+ messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ return consume(responseElements);
+ } catch (Exception e) {
logger.warn("Unable to get response from message router", e);
return Flux.empty();
}
}
- private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
- logger.trace("consume called with arg {}", message);
- return jsonMessageParser.getMessagesFromJson(message);
- }
-
- public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
- return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
+ private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) {
+ return jsonMessageParser.getMessagesFromJson(messages);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index cfaf1753..8b86440a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.File;
-import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -113,7 +112,7 @@ public class DataRouterPublisher {
private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
+ JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
URI uri = new DefaultUriBuilderFactory(
datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
@@ -155,12 +154,8 @@ public class DataRouterPublisher {
}
DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
- try {
- DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
- return new DmaapProducerHttpClient(cfg);
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Cannot resolve producer client", e);
- }
+ PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier);
+ return new DmaapProducerHttpClient(publisherConfiguration);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 037803bd..a9973cf4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+* Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +22,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
@@ -113,12 +113,7 @@ public class PublishedChecker {
return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
- throws DatafileTaskException {
- try {
- return new DmaapProducerHttpClient(publisherConfig.toDmaap());
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Cannot create published checker client", e);
- }
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) {
+ return new DmaapProducerHttpClient(publisherConfig);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 42a6fea3..eba0a6cb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -302,8 +302,7 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
MDC.setContextMap(context);
- logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
- this.applicationConfiguration.getDmaapConsumerConfiguration());
+ logger.error("Polling for file ready message failed, exception: {}", exception.toString());
return Flux.empty();
}