aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRemigiusz Janeczek <remigiusz.janeczek@nokia.com>2020-08-18 12:48:19 +0200
committerRemigiusz Janeczek <remigiusz.janeczek@nokia.com>2020-08-25 16:24:17 +0200
commit3b6c2739505c097f3ea32475ebd3db41bbaae7ef (patch)
treec68d7182520ff7bd48dd046cbc03373d38235cc4
parent42a2932087a43b05cdcd2247108246b3ccdb0b2b (diff)
Update dcae SDK from 1.1.6 to 1.4.2
Bump project version from 1.4.2 to 1.4.3 Update deprecated calls to JsonParser Make logs always go to file and console Issue-ID: DCAEGEN2-2267 Signed-off-by: Remigiusz Janeczek <remigiusz.janeczek@nokia.com> Change-Id: Ib8d7f82b3daf03ca327581c9a5dc4f6f27a20141
-rw-r--r--datafile-app-server/config/application.yaml1
-rw-r--r--datafile-app-server/pom.xml12
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java93
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java133
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java91
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java34
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java91
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java13
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java11
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java5
-rw-r--r--datafile-app-server/src/main/resources/logback-spring.xml67
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java161
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java101
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java108
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java88
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java29
-rwxr-xr-xdatafile-app-server/src/test/resources/cert.jksbin0 -> 4888 bytes
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test.json13
-rw-r--r--datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json13
-rwxr-xr-xdatafile-app-server/src/test/resources/jks.pass1
-rwxr-xr-xdatafile-app-server/src/test/resources/trust.jksbin0 -> 1413 bytes
-rwxr-xr-xdatafile-app-server/src/test/resources/trust.pass1
-rw-r--r--pom.xml9
-rw-r--r--version.properties2
30 files changed, 376 insertions, 881 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index 69771e20..d3cced84 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -21,6 +21,7 @@ logging:
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.dcaegen2.collectors.datafile: WARN
+ org.onap.dcaegen2: WARN
file: /var/log/ONAP/application.log
app:
filepath: config/datafile_endpoints_test.json
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index b06ed824..896f5d25 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ ============LICENSE_START=======================================================
- ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved.
+ ~ Copyright (C) 2018-2020 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved.
~ Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
~ ================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,7 +25,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.4.2-SNAPSHOT</version>
+ <version>1.4.3-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.collectors.datafile</groupId>
@@ -47,6 +47,10 @@
<artifactId>dmaap-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
+ <artifactId>ssl</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
</dependency>
@@ -71,6 +75,10 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
<!-- Actuator dependencies -->
<dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index c257ceed..d933e337 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -41,8 +41,9 @@ import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticConte
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,12 +72,12 @@ public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
private ConsumerConfiguration dmaapConsumerConfiguration;
private Map<String, PublisherConfiguration> publishingConfigurations;
private FtpesConfig ftpesConfiguration;
private SftpConfig sftpConfiguration;
- @Value("#{systemEnvironment}")
- Properties systemEnvironment;
private Disposable refreshConfigTask = null;
@NotEmpty
@@ -102,8 +103,8 @@ public class AppConfig {
}
Flux<AppConfig> createRefreshTask(Map<String, String> context) {
- return getEnvironment(systemEnvironment, context) //
- .flatMap(this::createCbsClient) //
+ return createCbsClientConfiguration()
+ .flatMap(this::createCbsClient)
.flatMapMany(this::periodicConfigurationUpdates) //
.map(this::parseCloudConfig) //
.onErrorResume(this::onErrorResume);
@@ -175,19 +176,25 @@ public class AppConfig {
return Mono.empty();
}
- Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
- return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+ Mono<CbsClientConfiguration> createCbsClientConfiguration() {
+ try {
+ return Mono.just(CbsClientConfiguration.fromEnvironment());
+ } catch (CbsClientConfigurationException e) {
+ return Mono.error(e);
+ }
}
- Mono<CbsClient> createCbsClient(EnvProperties env) {
- return CbsClientFactory.createCbsClient(env);
+ Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
+ return CbsClientFactory.createCbsClient(cbsClientConfiguration);
}
private AppConfig parseCloudConfig(JsonObject configurationObject) {
try {
- CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment);
- setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
- parser.getFtpesConfig(), parser.getSftpConfig());
+ CloudConfigParser parser =
+ new CloudConfigParser(configurationObject, systemEnvironment);
+ setConfiguration(parser.getConsumerConfiguration(),
+ parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig(),
+ parser.getSftpConfig());
logConfig();
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
@@ -207,8 +214,7 @@ public class AppConfig {
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
try (InputStream inputStream = createInputStream(filepath)) {
- JsonParser parser = new JsonParser();
- JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
@@ -228,8 +234,8 @@ public class AppConfig {
this.sftpConfiguration = sftpConfig;
}
- JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
- return parser.parse(new InputStreamReader(inputStream));
+ JsonElement getJsonElement(InputStream inputStream) {
+ return JsonParser.parseReader(new InputStreamReader(inputStream));
}
InputStream createInputStream(@NotNull String filepath) throws IOException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index a86a32b8..6ace4aae 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import javax.validation.constraints.NotNull;
+import io.vavr.collection.Stream;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
/**
* Parses the cloud configuration.
@@ -51,6 +67,12 @@ public class CloudConfigParser {
private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
"sftp.security.strictHostKeyChecking";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
+ private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
+ private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
+ private static final int FIRST_SOURCE_INDEX = 0;
+
private final Properties systemEnvironment;
private final JsonObject jsonObject;
@@ -78,7 +100,7 @@ public class CloudConfigParser {
PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
.publishUrl(getAsString(feedConfig, "publish_url")) //
- .passWord(getAsString(feedConfig, "password")) //
+ .password(getAsString(feedConfig, "password")) //
.userName(getAsString(feedConfig, "username")) //
.trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
.trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
@@ -98,24 +120,51 @@ public class CloudConfigParser {
* Get the consumer configuration.
*
* @return the consumer configuration.
- * @throws DatafileTaskException if a member of the configuration is missing.
+ * @throws DatafileTaskException if the configuration is invalid.
*/
- public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
- JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
- Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
- if (topics.size() != 1) {
- throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
+ public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
+ try {
+ MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
+ MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
+ return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not parse message router consumer configuration", e);
}
- JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
- String topicUrl = getAsString(dmaapInfo, "topic_url");
-
- return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
- .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
- .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
- .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
- .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
- .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ }
+
+ private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
+ .build();
+ }
+
+ private SecurityKeys createSecurityKeys() throws DatafileTaskException {
+ return ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
+ .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
+ .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
+ .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
+ .build();
+ }
+
+ private boolean isDmaapCertAuthEnabled(JsonObject config) {
+ return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
+ }
+
+ private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
+ Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
+ if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
+ throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
+ }
+ RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
+ MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
+ .sourceDefinition(parsedSource)
+ .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
+ .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
.build();
}
@@ -176,4 +225,8 @@ public class CloudConfigParser {
return get(obj, memberName).getAsJsonObject();
}
+ private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
+ return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
+ }
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
index 4db7963d..89fcf1c2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
@@ -1,113 +1,56 @@
/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 NOKIA Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
-import java.net.MalformedURLException;
-import java.net.URL;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+public class ConsumerConfiguration {
-@Value.Immutable
-@Value.Style(redactedMask = "####")
-@Gson.TypeAdapters
-public abstract class ConsumerConfiguration {
- @Value.Redacted
- public abstract String topicUrl();
+ private final MessageRouterSubscriberConfig messageRouterSubscriberConfig;
+ private final MessageRouterSubscriber messageRouterSubscriber;
+ private final MessageRouterSubscribeRequest messageRouterSubscribeRequest;
- public abstract String trustStorePath();
-
- public abstract String trustStorePasswordPath();
-
- public abstract String keyStorePath();
-
- public abstract String keyStorePasswordPath();
-
- public abstract Boolean enableDmaapCertAuth();
-
- /**
- * Gets the configuration in the SDK version.
- *
- * @return a <code>DmaapConsumerConfiguration</code> representing the configuration.
- *
- * @throws DatafileTaskException if something is wrong with the topic URL.
- */
- public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
- try {
- URL url = new URL(topicUrl());
- String passwd = "";
- String userName = "";
- if (url.getUserInfo() != null) {
- String[] userInfo = url.getUserInfo().split(":");
- userName = userInfo[0];
- passwd = userInfo[1];
- }
- String urlPath = url.getPath();
- DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
-
- return new ImmutableDmaapConsumerConfiguration.Builder() //
- .endpointUrl(topicUrl()) //
- .dmaapContentType("application/json") //
- .dmaapPortNumber(url.getPort()) //
- .dmaapHostName(url.getHost()) //
- .dmaapTopicName(path.dmaapTopicName) //
- .dmaapProtocol(url.getProtocol()) //
- .dmaapUserName(userName) //
- .dmaapUserPassword(passwd) //
- .trustStorePath(this.trustStorePath()) //
- .trustStorePasswordPath(this.trustStorePasswordPath()) //
- .keyStorePath(this.keyStorePath()) //
- .keyStorePasswordPath(this.keyStorePasswordPath()) //
- .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
- .consumerId(path.consumerId) //
- .consumerGroup(path.consumerGroup) //
- .timeoutMs(-1) //
- .messageLimit(-1) //
- .build();
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Could not parse the URL", e);
- }
+ public ConsumerConfiguration(MessageRouterSubscriberConfig messageRouterSubscriberConfig,
+ MessageRouterSubscriber messageRouterSubscriber, MessageRouterSubscribeRequest messageRouterSubscribeRequest) {
+ this.messageRouterSubscriberConfig = messageRouterSubscriberConfig;
+ this.messageRouterSubscriber = messageRouterSubscriber;
+ this.messageRouterSubscribeRequest = messageRouterSubscribeRequest;
}
- private class DmaapConsumerUrlPath {
- final String dmaapTopicName;
- final String consumerGroup;
- final String consumerId;
-
- DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
- this.dmaapTopicName = dmaapTopicName;
- this.consumerGroup = consumerGroup;
- this.consumerId = consumerId;
- }
+ public MessageRouterSubscriber getMessageRouterSubscriber() {
+ return messageRouterSubscriber;
}
- private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
- String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
- if (tokens.length != 5) {
- throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
- }
+ public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ return messageRouterSubscribeRequest;
+ }
- final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
- final String consumerGroup = tokens[3]; // OpenDcae-c12
- final String consumerId = tokens[4]; // C12
- return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ public MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() {
+ return messageRouterSubscriberConfig;
}
+ @Override
+ public String toString() {
+ return "ConsumerConfiguration{" + "securityKeys=" + messageRouterSubscriberConfig.securityKeys()
+ + ", consumerGroup=" + messageRouterSubscribeRequest.consumerGroup() + ", consumerID="
+ + messageRouterSubscribeRequest.consumerId() + '}';
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
deleted file mode 100644
index ad5f648d..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import reactor.core.publisher.Mono;
-
-/**
- * Handling the Consul connection.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- */
-class EnvironmentProcessor {
-
- private static final int DEFAULT_CONSUL_PORT = 8500;
- private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
-
- private EnvironmentProcessor() {
- }
-
- static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) {
- MDC.setContextMap(contextMap);
- logger.trace("Loading configuration from system environment variables");
- EnvProperties envProperties;
- try {
- envProperties = ImmutableEnvProperties.builder() //
- .consulHost(getConsulHost(systemEnvironment)) //
- .consulPort(getConsultPort(systemEnvironment)) //
- .cbsName(getConfigBindingService(systemEnvironment)) //
- .appName(getService(systemEnvironment)) //
- .build();
- } catch (EnvironmentLoaderException e) {
- return Mono.error(e);
- }
- logger.trace("Evaluated environment system variables {}", envProperties);
- return Mono.just(envProperties);
- }
-
- private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
- }
-
- private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
- .map(Integer::valueOf) //
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
- }
-
- private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
- }
-
- private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional
- .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
- }
-
- private static Integer getDefaultPortOfConsul() {
- logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
- return DEFAULT_CONSUL_PORT;
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index d7451bdb..fa7d7841 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,13 +17,10 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import java.net.MalformedURLException;
-import java.net.URL;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
@Value.Immutable
@Value.Style(redactedMask = "####")
@@ -36,7 +34,7 @@ public interface PublisherConfiguration {
String userName();
@Value.Redacted
- String passWord();
+ String password();
String trustStorePath();
@@ -50,30 +48,4 @@ public interface PublisherConfiguration {
String changeIdentifier();
- /**
- * Get the publisher configuration in SDK format.
- *
- * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration.
- * @throws MalformedURLException if the publish URL is malformed.
- */
- default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
- URL url = new URL(publishUrl());
- String urlPath = url.getPath();
-
- return new ImmutableDmaapPublisherConfiguration.Builder() //
- .endpointUrl(publishUrl()) //
- .dmaapContentType("application/octet-stream") //
- .dmaapPortNumber(url.getPort()) //
- .dmaapHostName(url.getHost()) //
- .dmaapTopicName(urlPath) //
- .dmaapProtocol(url.getProtocol()) //
- .dmaapUserName(this.userName()) //
- .dmaapUserPassword(this.passWord()) //
- .trustStorePath(this.trustStorePath()) //
- .trustStorePasswordPath(this.trustStorePasswordPath()) //
- .keyStorePath(this.keyStorePath()) //
- .keyStorePasswordPath(this.keyStorePasswordPath()) //
- .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
- .build();
- }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
deleted file mode 100644
index 5a8806b4..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.Builder;
-import reactor.core.publisher.Mono;
-
-/**
- * Web client for the DMaaP MessageRouter.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
- */
-public class DmaapWebClient {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class);
-
- private String contentType;
-
- /**
- * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
- *
- * @param dmaapCustomConfig - configuration object
- * @return DmaapReactiveWebClient
- */
- public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
- this.contentType = dmaapCustomConfig.dmaapContentType();
- return this;
- }
-
- /**
- * Construct Reactive WebClient with appropriate settings.
- *
- * @return WebClient
- */
- public WebClient build() {
- Builder webClientBuilder = WebClient.builder() //
- .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
- .filter(getRequestFilter()) //
- .filter(getResponseFilter());
- return webClientBuilder.build();
- }
-
- private ExchangeFilterFunction getResponseFilter() {
- return ExchangeFilterFunction.ofResponseProcessor(this::logResponse);
- }
-
- Mono<ClientResponse> logResponse(ClientResponse clientResponse) {
- MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
- logger.trace("Response Status {}", clientResponse.statusCode());
- MDC.remove(RESPONSE_CODE);
- return Mono.just(clientResponse);
- }
-
- private ExchangeFilterFunction getRequestFilter() {
- return ExchangeFilterFunction.ofRequestProcessor(this::logRequest);
- }
-
- Mono<ClientRequest> logRequest(ClientRequest clientRequest) {
- MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
- logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
- logger.trace("HTTP request headers: {}", clientRequest.headers());
- MDC.remove(SERVICE_NAME);
- return Mono.just(clientRequest);
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index eed0f0bd..708865fa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START========================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ==================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -96,18 +96,17 @@ public class JsonMessageParser {
* @return a <code>Flux</code> containing messages.
*/
- public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
- return rawMessage.flatMapMany(this::createMessageData);
+ public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) {
+ return rawMessage.flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
if (element.isJsonPrimitive()) {
- return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject());
+ return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject());
} else if (element.isJsonObject()) {
return Optional.of((JsonObject) element);
} else {
- return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
+ return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject());
}
}
@@ -117,9 +116,9 @@ public class JsonMessageParser {
}
/**
- * Extract info from string and create a Flux of {@link FileReadyMessage}.
+ * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}.
*
- * @param rawMessage - results from DMaaP
+ * @param jsonElement - result from DMaaP
* @return reactive Flux of FileReadyMessages
*/
private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
index a46e17ba..9de37e8c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,10 +37,10 @@ import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.ssl.SSLContextBuilder;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -61,14 +61,14 @@ public class DmaapProducerHttpClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final DmaapPublisherConfiguration configuration;
+ private final PublisherConfiguration configuration;
/**
* Constructor DmaapProducerReactiveHttpClient.
*
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
- public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ public DmaapProducerHttpClient(PublisherConfiguration dmaapPublisherConfiguration) {
this.configuration = dmaapPublisherConfiguration;
}
@@ -131,7 +131,7 @@ public class DmaapProducerHttpClient {
* @param request the request to add credentials to.
*/
public void addUserCredentialsToHead(HttpUriRequest request) {
- String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword();
+ String plainCreds = configuration.userName() + ":" + configuration.password();
byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
String base64Creds = new String(base64CredsBytes);
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 066983ae..0780e18e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,21 +22,14 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* Component used to get messages from the MessageRouter.
@@ -46,18 +40,14 @@ public class DMaaPMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
private final AppConfig datafileAppConfig;
private final JsonMessageParser jsonMessageParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
- this(datafileAppConfig, new JsonMessageParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+ this(datafileAppConfig, new JsonMessageParser());
}
- protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) {
this.datafileAppConfig = datafileAppConfig;
this.jsonMessageParser = jsonMessageParser;
- this.httpClientFactory = httpClientFactory;
}
/**
@@ -68,21 +58,20 @@ public class DMaaPMessageConsumer {
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
try {
- DMaaPConsumerReactiveHttpClient client = createHttpClient();
- return consume((client.getDMaaPConsumerResponse(Optional.empty())));
- } catch (DatafileTaskException e) {
+ ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
+ MessageRouterSubscriber messageRouterSubscriber =
+ dmaapConsumerConfiguration.getMessageRouterSubscriber();
+ Flux<JsonElement> responseElements =
+ messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ return consume(responseElements);
+ } catch (Exception e) {
logger.warn("Unable to get response from message router", e);
return Flux.empty();
}
}
- private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
- logger.trace("consume called with arg {}", message);
- return jsonMessageParser.getMessagesFromJson(message);
- }
-
- public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
- return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
+ private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) {
+ return jsonMessageParser.getMessagesFromJson(messages);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index cfaf1753..8b86440a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +25,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.io.File;
-import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -113,7 +112,7 @@ public class DataRouterPublisher {
private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
- JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
+ JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
URI uri = new DefaultUriBuilderFactory(
datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
@@ -155,12 +154,8 @@ public class DataRouterPublisher {
}
DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
- try {
- DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
- return new DmaapProducerHttpClient(cfg);
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Cannot resolve producer client", e);
- }
+ PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier);
+ return new DmaapProducerHttpClient(publisherConfiguration);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
index 037803bd..a9973cf4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Nordix Foundation.
+* Copyright (C) 2020 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +22,6 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
@@ -113,12 +113,7 @@ public class PublishedChecker {
return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
- throws DatafileTaskException {
- try {
- return new DmaapProducerHttpClient(publisherConfig.toDmaap());
- } catch (MalformedURLException e) {
- throw new DatafileTaskException("Cannot create published checker client", e);
- }
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) {
+ return new DmaapProducerHttpClient(publisherConfig);
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 42a6fea3..eba0a6cb 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -302,8 +302,7 @@ public class ScheduledTasks {
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
MDC.setContextMap(context);
- logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
- this.applicationConfiguration.getDmaapConsumerConfiguration());
+ logger.error("Polling for file ready message failed, exception: {}", exception.toString());
return Flux.empty();
}
diff --git a/datafile-app-server/src/main/resources/logback-spring.xml b/datafile-app-server/src/main/resources/logback-spring.xml
index 1b9818d5..89405f2f 100644
--- a/datafile-app-server/src/main/resources/logback-spring.xml
+++ b/datafile-app-server/src/main/resources/logback-spring.xml
@@ -16,49 +16,28 @@
|%thread
|%n"/>
- <springProfile name="dev">
- <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>${defaultPattern}</pattern>
- </encoder>
- </appender>
+ <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${defaultPattern}</pattern>
+ </encoder>
+ </appender>
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${defaultPattern}</pattern>
- </encoder>
- <file>${logPath}/${outputFilename}.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
- <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
- <MaxFileSize>${maxFileSize}</MaxFileSize>
- <MaxHistory>${maxHistory}</MaxHistory>
- <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
- </rollingPolicy>
- </appender>
- <root level="ERROR">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
- </root>
- </springProfile>
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${defaultPattern}</pattern>
+ </encoder>
+ <file>${logPath}/${outputFilename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <MaxFileSize>${maxFileSize}</MaxFileSize>
+ <MaxHistory>${maxHistory}</MaxHistory>
+ <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
+ </rollingPolicy>
+ </appender>
+ <root level="ERROR">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
- <springProfile name="prod">
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${defaultPattern}</pattern>
- </encoder>
- <file>${logPath}/${outputFilename}.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
- <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
- <MaxFileSize>${maxFileSize}</MaxFileSize>
- <MaxHistory>${maxHistory}</MaxHistory>
- <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
- </rollingPolicy>
- </appender>
-
- <root level="ERROR">
- <appender-ref ref="ROLLING-FILE"/>
- </root>
- </springProfile>
-</configuration> \ No newline at end of file
+</configuration>
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index d0f02d69..dc8a1229 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,20 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
-
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import com.google.gson.JsonElement;
@@ -37,16 +25,6 @@ import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Properties;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,15 +32,33 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests the AppConfig.
*
@@ -73,50 +69,18 @@ public class AppConfigTest {
public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .endpointUrl(
- "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .timeoutMs(-1) //
- .dmaapHostName("localhost") //
- .dmaapUserName("dradmin") //
- .dmaapUserPassword("dradmin") //
- .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDcae-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
ImmutablePublisherConfiguration.builder() //
.publishUrl("https://localhost:3907/publish/1") //
.logUrl("https://localhost:3907/feedlog/1") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
+ .trustStorePath("src/test/resources/trust.jks") //
+ .trustStorePasswordPath("src/test/resources/trust.pass") //
+ .keyStorePath("src/test/resources/cert.jks") //
+ .keyStorePasswordPath("src/test/resources/jks.pass") //
.enableDmaapCertAuth(true) //
.changeIdentifier("PM_MEAS_FILES") //
.userName("CYE9fl40") //
- .passWord("izBJD8nLjawq0HMG") //
+ .password("izBJD8nLjawq0HMG") //
.build();
private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
@@ -127,35 +91,10 @@ public class AppConfigTest {
.trustedCaPasswordPath("/src/test/resources/ftp.jks.pass") //
.build();
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .endpointUrl("https://localhost:3907/publish/1") //
- .dmaapTopicName("/publish/1") //
- .dmaapUserPassword("izBJD8nLjawq0HMG") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/octet-stream") //
- .dmaapHostName("localhost") //
- .dmaapUserName("CYE9fl40") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static EnvProperties properties() {
- return ImmutableEnvProperties.builder() //
- .consulHost("host") //
- .consulPort(123) //
- .cbsName("cbsName") //
- .appName("appName") //
- .build();
- }
-
private AppConfig appConfigUnderTest;
private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
CbsClient cbsClient = mock(CbsClient.class);
+ CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class);
@BeforeEach
void setUp() {
@@ -175,13 +114,11 @@ public class AppConfigTest {
ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
Assertions.assertNotNull(consumerCfg);
- assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+ assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration);
PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
Assertions.assertNotNull(publisherCfg);
assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
- assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
assertThat(ftpesConfig).isNotNull();
@@ -245,7 +182,7 @@ public class AppConfigTest {
doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
- doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
+ doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
@@ -266,15 +203,13 @@ public class AppConfigTest {
.expectSubscription() //
.verifyComplete(); //
- assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException"));
}
@Test
public void whenPeriodicConfigRefreshNoConsul() {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> err = Flux.error(new IOException());
doReturn(err).when(cbsClient).updates(any(), any(), any());
@@ -292,9 +227,8 @@ public class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> json = Flux.just(getJsonRootObject());
doReturn(json).when(cbsClient).updates(any(), any(), any());
@@ -312,10 +246,8 @@ public class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> json = Flux.just(getJsonRootObject());
Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
@@ -334,8 +266,21 @@ public class AppConfigTest {
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
}
+ private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest =
+ consumerConfiguration.getMessageRouterSubscribeRequest();
+ assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
+ assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
+ assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
+ .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
+ SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
+ assertThat(securityKeys.keyStore().path().toString()).isEqualTo("src/test/resources/cert.jks");
+ assertThat(securityKeys.trustStore().path().toString()).isEqualTo("src/test/resources/trust.jks");
+ assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull();
+ }
+
private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
- JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject();
return rootObject;
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
deleted file mode 100644
index bdeb1c1e..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-
-public class ConsumerConfigurationTest {
- @Test
- public void toDmaapSuccess() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
- assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
- assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
- assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
- assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
- assertEquals("C12", dmaapConsumerConfiguration.consumerId());
- }
-
- @Test
- public void toDmaapNoUserInfoSuccess() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
- assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
- assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
- assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
- assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
- assertEquals("C12", dmaapConsumerConfiguration.consumerId());
- }
-
- @Test
- public void toDmaapWhenInvalidUrlThrowException() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl("//admin:admin@message-router.onap.svc.cluster.local:2222//events/").trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DatafileTaskException exception =
- assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
- assertEquals("Could not parse the URL", exception.getMessage());
- }
-
- @Test
- public void toDmaapWhenInvalidPathThrowException() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl("http://admin:admin@message-router.onap.svc.cluster.local:2222//events/") //
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DatafileTaskException exception =
- assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
- assertEquals("The path has incorrect syntax: //events/", exception.getMessage());
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
deleted file mode 100644
index d4e060ff..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
-@ExtendWith(MockitoExtension.class)
-class DmaapWebClientTest {
-
- @Mock
- private DmaapConsumerConfiguration dmaapConsumerConfigurationMock;
-
- @Mock
- private ClientResponse clientResponseMock;
-
- @Mock
- private ClientRequest clientRequesteMock;
-
- @Test
- void buildsDMaaPReactiveWebClientProperly() {
- when(dmaapConsumerConfigurationMock.dmaapContentType()).thenReturn("*/*");
- WebClient dmaapWebClientUndetTest = new DmaapWebClient() //
- .fromConfiguration(dmaapConsumerConfigurationMock) //
- .build();
-
- verify(dmaapConsumerConfigurationMock, times(1)).dmaapContentType();
- assertNotNull(dmaapWebClientUndetTest);
- }
-
- @Test
- public void logResponseSuccess() {
- DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
- when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK);
-
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
- Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock);
-
- assertEquals(clientResponseMock, logResponse.block());
-
- assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
- assertEquals("Response Status 200 OK", logAppender.list.get(0).getFormattedMessage());
-
- logAppender.stop();
- }
-
- @Test
- public void logRequestSuccess() throws URISyntaxException {
- when(clientRequesteMock.url()).thenReturn(new URI("http://test"));
- when(clientRequesteMock.method()).thenReturn(HttpMethod.GET);
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.add("header", "value");
- when(clientRequesteMock.headers()).thenReturn(httpHeaders);
-
- DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
- Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock);
-
- assertEquals(clientRequesteMock, logRequest.block());
-
- assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
- assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage());
- assertEquals(Level.TRACE, logAppender.list.get(1).getLevel());
- assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage());
-
- logAppender.stop();
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 8fb8c364..bfb9b13e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -46,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -120,11 +121,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -173,12 +174,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
- JsonElement jsonElement1 = new JsonParser().parse(messageString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
+ JsonElement jsonElement1 = JsonParser.parseString(messageString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement1)))
.expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
}
@@ -200,12 +201,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue(logAppender.list.toString()
@@ -232,12 +233,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -293,9 +294,9 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
- JsonElement jsonElement = new JsonParser().parse(messageString);
+ JsonElement jsonElement = JsonParser.parseString(messageString);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -317,12 +318,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -348,12 +349,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -374,12 +375,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -405,12 +406,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -438,12 +439,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -504,11 +505,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -520,12 +521,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -538,13 +539,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithNullJsonElement_noFileData() {
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse("{}");
+ JsonElement jsonElement = JsonParser.parseString("{}");
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -569,12 +570,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).expectComplete().verify();
assertTrue("Error missing in log",
@@ -601,11 +602,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index d4541efb..1ddb3a5c 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -50,10 +50,10 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
/**
* Test for DmaapProducerHttpClient.
@@ -73,7 +73,7 @@ class DmaapProducerHttpClientTest {
private DmaapProducerHttpClient producerClientUnderTestSpy;
- private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private PublisherConfiguration dmaapPublisherConfigurationMock = mock(PublisherConfiguration.class);
private HttpAsyncClientBuilderWrapper clientBuilderMock;
@@ -83,11 +83,8 @@ class DmaapProducerHttpClientTest {
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
- when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
+ when(dmaapPublisherConfigurationMock.userName()).thenReturn("dradmin");
+ when(dmaapPublisherConfigurationMock.password()).thenReturn("dradmin");
producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index d4dd89f0..f0c8e3b3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -21,26 +21,8 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -58,13 +40,27 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
public class DMaaPMessageConsumerTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
@@ -90,8 +86,6 @@ public class DMaaPMessageConsumerTest {
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- private DMaaPConsumerReactiveHttpClient httpClientMock;
-
private DMaaPMessageConsumer messageConsumer;
private static String ftpesMessageString;
private static JsonElement ftpesMessageJson;
@@ -105,6 +99,7 @@ public class DMaaPMessageConsumerTest {
private static AppConfig appConfig;
private static ConsumerConfiguration dmaapConsumerConfiguration;
+ private static MessageRouterSubscriber messageRouterSubscriber;
/**
* Sets up data for the test.
@@ -113,9 +108,6 @@ public class DMaaPMessageConsumerTest {
public static void setUp() {
appConfig = mock(AppConfig.class);
- dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
-
- JsonParser jsonParser = new JsonParser();
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
@@ -133,7 +125,7 @@ public class DMaaPMessageConsumerTest {
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+ ftpesMessageJson = JsonParser.parseString(ftpesMessageString);
MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
.productName(PRODUCT_NAME) //
@@ -175,7 +167,7 @@ public class DMaaPMessageConsumerTest {
.addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpMessageJson = jsonParser.parse(sftpMessageString);
+ sftpMessageJson = JsonParser.parseString(sftpMessageString);
sftpFileData = ImmutableFileData.builder() //
.name(PM_FILE_NAME) //
.location(SFTP_LOCATION) //
@@ -220,46 +212,50 @@ public class DMaaPMessageConsumerTest {
.expectError(DatafileTaskException.class) //
.verify();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
}
@Test
- public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenFtpes_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedFtpesMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+
+
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
@Test
- public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenSftp_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedSftpMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
FileReadyMessage fileReadyMessageAfterConsume) {
- Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
+ Flux<JsonElement> messageAsMono = message.isPresent() ? Flux.just(message.get()) : Flux.empty();
+
+ messageRouterSubscriber = mock(MessageRouterSubscriber.class);
+ dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class),
+ messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
+
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
- when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+ when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()))
+ .thenReturn(messageAsMono);
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- try {
- doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
- } catch (DatafileTaskException e) {
- e.printStackTrace();
- }
if (message.isPresent()) {
when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
@@ -268,7 +264,7 @@ public class DMaaPMessageConsumerTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
+ messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 1cb79bcf..199ac9f6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -66,6 +65,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
import org.slf4j.MDC;
@@ -110,7 +112,7 @@ public class ScheduledTasksTest {
.publishUrl(publishUrl) //
.logUrl("") //
.userName("userName") //
- .passWord("passWord") //
+ .password("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
@@ -118,13 +120,10 @@ public class ScheduledTasksTest {
.enableDmaapCertAuth(true) //
.changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
- .topicUrl("topicUrl").trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
+ final ConsumerConfiguration dmaapConsumerConfiguration =
+ new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+ mock(MessageRouterSubscribeRequest.class));
+
doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
@@ -266,7 +265,7 @@ public class ScheduledTasksTest {
.publishUrl(publishUrl) //
.logUrl("") //
.userName("userName") //
- .passWord("passWord") //
+ .password("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
@@ -274,13 +273,9 @@ public class ScheduledTasksTest {
.enableDmaapCertAuth(true) //
.changeIdentifier("Different changeIdentifier") //
.build(); //
- final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
- .topicUrl("topicUrl").trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
+ final ConsumerConfiguration dmaapConsumerConfiguration =
+ new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+ mock(MessageRouterSubscribeRequest.class));
doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
diff --git a/datafile-app-server/src/test/resources/cert.jks b/datafile-app-server/src/test/resources/cert.jks
new file mode 100755
index 00000000..ff0e95ce
--- /dev/null
+++ b/datafile-app-server/src/test/resources/cert.jks
Binary files differ
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
index 62119e61..8e51b807 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints_test.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json
@@ -5,11 +5,14 @@
"dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass",
"dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks",
"dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass",
- "dmaap.security.trustStorePath": "trustStorePath",
- "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
- "dmaap.security.keyStorePath": "keyStorePath",
- "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+ "dmaap.security.trustStorePath": "src/test/resources/trust.jks",
+ "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass",
+ "dmaap.security.keyStorePath": "src/test/resources/cert.jks",
+ "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass",
"dmaap.security.enableDmaapCertAuth": "true",
+ "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12",
+ "dmaap.dmaapConsumerConfiguration.consumerId": "C12",
+ "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000,
"sftp.security.strictHostKeyChecking": "false",
"streams_publishes": {
"PM_MEAS_FILES": {
@@ -27,7 +30,7 @@
"streams_subscribes": {
"dmaap_subscriber": {
"dmaap_info": {
- "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ "topic_url": "http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT"
},
"type": "message_router"
}
diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
index 480a6f79..e1327462 100644
--- a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
+++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
@@ -5,12 +5,15 @@
"dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass",
"dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks",
"dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass",
- "dmaap.security.trustStorePath": "trustStorePath",
- "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
- "dmaap.security.keyStorePath": "keyStorePath",
- "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+ "dmaap.security.trustStorePath": "src/test/resources/trust.jks",
+ "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass",
+ "dmaap.security.keyStorePath": "src/test/resources/cert.jks",
+ "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass",
"dmaap.security.enableDmaapCertAuth": "true",
"sftp.security.strictHostKeyChecking": "false",
+ "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12",
+ "dmaap.dmaapConsumerConfiguration.consumerId": "C12",
+ "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000,
"streams_publishes": {
"PM_MEAS_FILES": {
"type": "data_router",
@@ -49,7 +52,7 @@
"streams_subscribes": {
"dmaap_subscriber": {
"dmaap_info": {
- "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ "topic_url": "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
},
"type": "message_router"
}
diff --git a/datafile-app-server/src/test/resources/jks.pass b/datafile-app-server/src/test/resources/jks.pass
new file mode 100755
index 00000000..b2c3df47
--- /dev/null
+++ b/datafile-app-server/src/test/resources/jks.pass
@@ -0,0 +1 @@
+hD:!w:CxF]lGvM6Mz9l^j[7U \ No newline at end of file
diff --git a/datafile-app-server/src/test/resources/trust.jks b/datafile-app-server/src/test/resources/trust.jks
new file mode 100755
index 00000000..fc62ad2f
--- /dev/null
+++ b/datafile-app-server/src/test/resources/trust.jks
Binary files differ
diff --git a/datafile-app-server/src/test/resources/trust.pass b/datafile-app-server/src/test/resources/trust.pass
new file mode 100755
index 00000000..047a411a
--- /dev/null
+++ b/datafile-app-server/src/test/resources/trust.pass
@@ -0,0 +1 @@
+jeQ2l]iyB62D{WbSHL]dN*8R \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a807a426..8febb283 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<groupId>org.onap.dcaegen2.collectors</groupId>
<artifactId>datafile</artifactId>
- <version>1.4.2-SNAPSHOT</version>
+ <version>1.4.3-SNAPSHOT</version>
<name>dcaegen2-collectors.datafile</name>
<description>datafile collector</description>
@@ -45,7 +45,7 @@
<properties>
<java.version>11</java.version>
- <sdk.version>1.1.6</sdk.version>
+ <sdk.version>1.4.2</sdk.version>
<apache.httpcomponents.version>4.1.4</apache.httpcomponents.version>
<apache.commons.version>3.6</apache.commons.version>
<immutable.version>2.7.1</immutable.version>
@@ -91,6 +91,11 @@
<version>${sdk.version}</version>
</dependency>
<dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
+ <artifactId>ssl</artifactId>
+ <version>${sdk.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>${apache.httpcomponents.version}</version>
diff --git a/version.properties b/version.properties
index b0c0f623..69a4fe17 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
major=1
minor=4
-patch=2
+patch=3
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT