aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/test/java')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java161
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java101
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java108
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java13
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java88
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java29
7 files changed, 144 insertions, 419 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index d0f02d69..dc8a1229 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,20 +16,8 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
-
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import com.google.gson.JsonElement;
@@ -37,16 +25,6 @@ import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Properties;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,15 +32,33 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests the AppConfig.
*
@@ -73,50 +69,18 @@ public class AppConfigTest {
public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .endpointUrl(
- "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .timeoutMs(-1) //
- .dmaapHostName("localhost") //
- .dmaapUserName("dradmin") //
- .dmaapUserPassword("dradmin") //
- .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDcae-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
ImmutablePublisherConfiguration.builder() //
.publishUrl("https://localhost:3907/publish/1") //
.logUrl("https://localhost:3907/feedlog/1") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
+ .trustStorePath("src/test/resources/trust.jks") //
+ .trustStorePasswordPath("src/test/resources/trust.pass") //
+ .keyStorePath("src/test/resources/cert.jks") //
+ .keyStorePasswordPath("src/test/resources/jks.pass") //
.enableDmaapCertAuth(true) //
.changeIdentifier("PM_MEAS_FILES") //
.userName("CYE9fl40") //
- .passWord("izBJD8nLjawq0HMG") //
+ .password("izBJD8nLjawq0HMG") //
.build();
private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
@@ -127,35 +91,10 @@ public class AppConfigTest {
.trustedCaPasswordPath("/src/test/resources/ftp.jks.pass") //
.build();
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .endpointUrl("https://localhost:3907/publish/1") //
- .dmaapTopicName("/publish/1") //
- .dmaapUserPassword("izBJD8nLjawq0HMG") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/octet-stream") //
- .dmaapHostName("localhost") //
- .dmaapUserName("CYE9fl40") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static EnvProperties properties() {
- return ImmutableEnvProperties.builder() //
- .consulHost("host") //
- .consulPort(123) //
- .cbsName("cbsName") //
- .appName("appName") //
- .build();
- }
-
private AppConfig appConfigUnderTest;
private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
CbsClient cbsClient = mock(CbsClient.class);
+ CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class);
@BeforeEach
void setUp() {
@@ -175,13 +114,11 @@ public class AppConfigTest {
ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
Assertions.assertNotNull(consumerCfg);
- assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+ assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration);
PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
Assertions.assertNotNull(publisherCfg);
assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
- assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
assertThat(ftpesConfig).isNotNull();
@@ -245,7 +182,7 @@ public class AppConfigTest {
doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
- doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
+ doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
@@ -266,15 +203,13 @@ public class AppConfigTest {
.expectSubscription() //
.verifyComplete(); //
- assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException"));
}
@Test
public void whenPeriodicConfigRefreshNoConsul() {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> err = Flux.error(new IOException());
doReturn(err).when(cbsClient).updates(any(), any(), any());
@@ -292,9 +227,8 @@ public class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> json = Flux.just(getJsonRootObject());
doReturn(json).when(cbsClient).updates(any(), any(), any());
@@ -312,10 +246,8 @@ public class AppConfigTest {
@Test
public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
- EnvProperties props = properties();
- doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
-
- doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+ doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration();
+ doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration);
Flux<JsonObject> json = Flux.just(getJsonRootObject());
Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
@@ -334,8 +266,21 @@ public class AppConfigTest {
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
}
+ private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest =
+ consumerConfiguration.getMessageRouterSubscribeRequest();
+ assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
+ assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
+ assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
+ .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
+ SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
+ assertThat(securityKeys.keyStore().path().toString()).isEqualTo("src/test/resources/cert.jks");
+ assertThat(securityKeys.trustStore().path().toString()).isEqualTo("src/test/resources/trust.jks");
+ assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull();
+ }
+
private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
- JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject();
return rootObject;
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
deleted file mode 100644
index bdeb1c1e..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-
-public class ConsumerConfigurationTest {
- @Test
- public void toDmaapSuccess() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
- assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
- assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
- assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
- assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
- assertEquals("C12", dmaapConsumerConfiguration.consumerId());
- }
-
- @Test
- public void toDmaapNoUserInfoSuccess() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl(
- "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
- assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
- assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
- assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
- assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
- assertEquals("C12", dmaapConsumerConfiguration.consumerId());
- }
-
- @Test
- public void toDmaapWhenInvalidUrlThrowException() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl("//admin:admin@message-router.onap.svc.cluster.local:2222//events/").trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DatafileTaskException exception =
- assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
- assertEquals("Could not parse the URL", exception.getMessage());
- }
-
- @Test
- public void toDmaapWhenInvalidPathThrowException() throws DatafileTaskException {
- ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
- .topicUrl("http://admin:admin@message-router.onap.svc.cluster.local:2222//events/") //
- .trustStorePath("") //
- .trustStorePasswordPath("") //
- .keyStorePath("") //
- .keyStorePasswordPath("") //
- .enableDmaapCertAuth(Boolean.FALSE) //
- .build();
-
- DatafileTaskException exception =
- assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
- assertEquals("The path has incorrect syntax: //events/", exception.getMessage());
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
deleted file mode 100644
index d4e060ff..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.ClientRequest;
-import org.springframework.web.reactive.function.client.ClientResponse;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
-@ExtendWith(MockitoExtension.class)
-class DmaapWebClientTest {
-
- @Mock
- private DmaapConsumerConfiguration dmaapConsumerConfigurationMock;
-
- @Mock
- private ClientResponse clientResponseMock;
-
- @Mock
- private ClientRequest clientRequesteMock;
-
- @Test
- void buildsDMaaPReactiveWebClientProperly() {
- when(dmaapConsumerConfigurationMock.dmaapContentType()).thenReturn("*/*");
- WebClient dmaapWebClientUndetTest = new DmaapWebClient() //
- .fromConfiguration(dmaapConsumerConfigurationMock) //
- .build();
-
- verify(dmaapConsumerConfigurationMock, times(1)).dmaapContentType();
- assertNotNull(dmaapWebClientUndetTest);
- }
-
- @Test
- public void logResponseSuccess() {
- DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
- when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK);
-
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
- Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock);
-
- assertEquals(clientResponseMock, logResponse.block());
-
- assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
- assertEquals("Response Status 200 OK", logAppender.list.get(0).getFormattedMessage());
-
- logAppender.stop();
- }
-
- @Test
- public void logRequestSuccess() throws URISyntaxException {
- when(clientRequesteMock.url()).thenReturn(new URI("http://test"));
- when(clientRequesteMock.method()).thenReturn(HttpMethod.GET);
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.add("header", "value");
- when(clientRequesteMock.headers()).thenReturn(httpHeaders);
-
- DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
-
- final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
- Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock);
-
- assertEquals(clientRequesteMock, logRequest.block());
-
- assertEquals(Level.TRACE, logAppender.list.get(0).getLevel());
- assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage());
- assertEquals(Level.TRACE, logAppender.list.get(1).getLevel());
- assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage());
-
- logAppender.stop();
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 8fb8c364..bfb9b13e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -46,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -120,11 +121,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -173,12 +174,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
String messageString = "[" + parsedString + "," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
- JsonElement jsonElement1 = new JsonParser().parse(messageString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
+ JsonElement jsonElement1 = JsonParser.parseString(messageString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1)))
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement1)))
.expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
}
@@ -200,12 +201,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue(logAppender.list.toString()
@@ -232,12 +233,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -293,9 +294,9 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
String messageString = "[{\"event\":{}}," + parsedString + "]";
JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
- JsonElement jsonElement = new JsonParser().parse(messageString);
+ JsonElement jsonElement = JsonParser.parseString(messageString);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -317,12 +318,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -348,12 +349,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -374,12 +375,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -405,12 +406,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -438,12 +439,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).verifyComplete();
assertTrue("Error missing in log",
@@ -504,11 +505,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNext(expectedMessage).verifyComplete();
}
@@ -520,12 +521,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -538,13 +539,13 @@ class JsonMessageParserTest {
@Test
void whenPassingJsonWithNullJsonElement_noFileData() {
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse("{}");
+ JsonElement jsonElement = JsonParser.parseString("{}");
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
assertTrue("Error missing in log",
@@ -569,12 +570,12 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectNextCount(0).expectComplete().verify();
assertTrue("Error missing in log",
@@ -601,11 +602,11 @@ class JsonMessageParserTest {
String parsedString = message.getParsed();
JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
- JsonElement jsonElement = new JsonParser().parse(parsedString);
+ JsonElement jsonElement = JsonParser.parseString(parsedString);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
.getJsonObjectFromAnArray(jsonElement);
- StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+ StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
.expectComplete().verify();
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index d4541efb..1ddb3a5c 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -50,10 +50,10 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
/**
* Test for DmaapProducerHttpClient.
@@ -73,7 +73,7 @@ class DmaapProducerHttpClientTest {
private DmaapProducerHttpClient producerClientUnderTestSpy;
- private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private PublisherConfiguration dmaapPublisherConfigurationMock = mock(PublisherConfiguration.class);
private HttpAsyncClientBuilderWrapper clientBuilderMock;
@@ -83,11 +83,8 @@ class DmaapProducerHttpClientTest {
@BeforeEach
void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
- when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
- when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
- when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
+ when(dmaapPublisherConfigurationMock.userName()).thenReturn("dradmin");
+ when(dmaapPublisherConfigurationMock.password()).thenReturn("dradmin");
producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock));
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index d4dd89f0..f0c8e3b3 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -21,26 +21,8 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -58,13 +40,27 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
public class DMaaPMessageConsumerTest {
private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
private static final String PRODUCT_NAME = "NrRadio";
@@ -90,8 +86,6 @@ public class DMaaPMessageConsumerTest {
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
- private DMaaPConsumerReactiveHttpClient httpClientMock;
-
private DMaaPMessageConsumer messageConsumer;
private static String ftpesMessageString;
private static JsonElement ftpesMessageJson;
@@ -105,6 +99,7 @@ public class DMaaPMessageConsumerTest {
private static AppConfig appConfig;
private static ConsumerConfiguration dmaapConsumerConfiguration;
+ private static MessageRouterSubscriber messageRouterSubscriber;
/**
* Sets up data for the test.
@@ -113,9 +108,6 @@ public class DMaaPMessageConsumerTest {
public static void setUp() {
appConfig = mock(AppConfig.class);
- dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
-
- JsonParser jsonParser = new JsonParser();
AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
.location(FTPES_LOCATION) //
@@ -133,7 +125,7 @@ public class DMaaPMessageConsumerTest {
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+ ftpesMessageJson = JsonParser.parseString(ftpesMessageString);
MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
.productName(PRODUCT_NAME) //
@@ -175,7 +167,7 @@ public class DMaaPMessageConsumerTest {
.addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpMessageJson = jsonParser.parse(sftpMessageString);
+ sftpMessageJson = JsonParser.parseString(sftpMessageString);
sftpFileData = ImmutableFileData.builder() //
.name(PM_FILE_NAME) //
.location(SFTP_LOCATION) //
@@ -220,46 +212,50 @@ public class DMaaPMessageConsumerTest {
.expectError(DatafileTaskException.class) //
.verify();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
}
@Test
- public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenFtpes_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedFtpesMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+
+
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
@Test
- public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
+ public void whenSftp_ReturnsCorrectResponse() {
prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
.expectNext(expectedSftpMessage) //
.verifyComplete();
- verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
- verifyNoMoreInteractions(httpClientMock);
+ verify(messageRouterSubscriber, times(1))
+ .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+ verifyNoMoreInteractions(messageRouterSubscriber);
}
private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
FileReadyMessage fileReadyMessageAfterConsume) {
- Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
+ Flux<JsonElement> messageAsMono = message.isPresent() ? Flux.just(message.get()) : Flux.empty();
+
+ messageRouterSubscriber = mock(MessageRouterSubscriber.class);
+ dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class),
+ messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
+
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
- when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+ when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()))
+ .thenReturn(messageAsMono);
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- try {
- doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
- } catch (DatafileTaskException e) {
- e.printStackTrace();
- }
if (message.isPresent()) {
when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
@@ -268,7 +264,7 @@ public class DMaaPMessageConsumerTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
+ messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 1cb79bcf..199ac9f6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -66,6 +65,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
import org.slf4j.MDC;
@@ -110,7 +112,7 @@ public class ScheduledTasksTest {
.publishUrl(publishUrl) //
.logUrl("") //
.userName("userName") //
- .passWord("passWord") //
+ .password("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
@@ -118,13 +120,10 @@ public class ScheduledTasksTest {
.enableDmaapCertAuth(true) //
.changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
- .topicUrl("topicUrl").trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
+ final ConsumerConfiguration dmaapConsumerConfiguration =
+ new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+ mock(MessageRouterSubscribeRequest.class));
+
doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
@@ -266,7 +265,7 @@ public class ScheduledTasksTest {
.publishUrl(publishUrl) //
.logUrl("") //
.userName("userName") //
- .passWord("passWord") //
+ .password("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
@@ -274,13 +273,9 @@ public class ScheduledTasksTest {
.enableDmaapCertAuth(true) //
.changeIdentifier("Different changeIdentifier") //
.build(); //
- final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
- .topicUrl("topicUrl").trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
+ final ConsumerConfiguration dmaapConsumerConfiguration =
+ new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
+ mock(MessageRouterSubscribeRequest.class));
doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();