diff options
author | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2019-04-10 13:04:11 +0300 |
---|---|---|
committer | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2019-04-10 13:04:11 +0300 |
commit | 47201b60365a87b993a8a93d0b1c1bb03cbca1d6 (patch) | |
tree | bd316f291b7208c33dc28a183fb7f34721250d3c /components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor | |
parent | 35cc15e04411008b2f8094bbd3876e7a2daed587 (diff) |
Support for HTTPS certificates-based communication with A&AI and DMaaP
Also, upgraded DCAE-SDK to the latest 1.1.4 version
Change-Id: Ica59ab3107d9c0bcbf4dbaacf5063d4ceb8ed4b9
Issue-ID: DCAEGEN2-1354
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor')
9 files changed, 176 insertions, 113 deletions
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java index 220466bc..69fbb3f0 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java @@ -91,8 +91,8 @@ import org.springframework.test.context.TestPropertySource; "configs.security.trustStorePasswordPath=test trust store password path", "configs.security.keyStorePath=test key store path", "configs.security.keyStorePasswordPath=test key store password path", - "configs.security.enableDmaapCertAuth=true", - "configs.security.enableAaiCertAuth=true", + "configs.security.enableDmaapCertAuth=false", + "configs.security.enableAaiCertAuth=false", "configs.application.pipelinesPollingIntervalSec=30", "configs.application.pipelinesTimeoutSec=15", "configs.application.policyVersion=1.0.0", @@ -208,6 +208,17 @@ class ApplicationConfigurationTest { () -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()), () -> assertEquals("cpeAuthControlName", configuration.getCpeAuthenticationCloseLoopControlName()) ); + + assertAll("Security Application Properties", + () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()), + () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), + () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("test key store password path", + aaiClientConfiguration.keyStorePasswordPath()), + () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("test trust store password path", + aaiClientConfiguration.trustStorePasswordPath()) + ); } @Test @@ -287,6 +298,12 @@ class ApplicationConfigurationTest { .cpeAuthConfigKey("config_key_2") .closeLoopConfigKey("config_key_3") .loggingLevel("TRACE") + .keyStorePath("test key store path - update") + .keyStorePasswordPath("test key store password path - update") + .trustStorePath("test trust store path - update") + .trustStorePasswordPath("test trust store password path - update") + .enableAaiCertAuth(true) + .enableDmaapCertAuth(true) .streamSubscribesMap(subscribes) .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3)) .build(); @@ -315,11 +332,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Consumer Re-Registration Configuration Properties", () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()), () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.PNF_UPDATE", + () -> assertEquals("events/unauthenticated.PNF_UPDATE", dmaapConsumerReRegistrationConfig.dmaapTopicName()), () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()), + () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()), () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()), () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()), () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()), @@ -332,11 +349,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Consumer CPE Authentication Configuration Properties", () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()), () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION", + () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION", dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()), () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), - () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()), + () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()), () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()), () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()), () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()), @@ -348,11 +365,11 @@ class ApplicationConfigurationTest { assertAll("DMaaP Publisher Configuration Properties", () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()), () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()), - () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT", + () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT", dmaapPublisherConfiguration.dmaapTopicName()), () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()), - () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()), + () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()), + () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()), () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType()) ); @@ -371,5 +388,16 @@ class ApplicationConfigurationTest { () -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()), () -> assertEquals("controlName-update", configuration.getCpeAuthenticationCloseLoopControlName()) ); + + assertAll("Security Application Properties", + () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()), + () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()), + () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()), + () -> assertEquals("test key store password path - update", + aaiClientConfiguration.keyStorePasswordPath()), + () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()), + () -> assertEquals("test trust store password path - update", + aaiClientConfiguration.trustStorePasswordPath()) + ); } }
\ No newline at end of file diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java index 9f5ce6dc..1acf864d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java @@ -84,6 +84,12 @@ class ConsulConfigurationGatewayTest { + "\"application.cpeAuth.configKey\": \"config_key_1\"," + "\"application.closeLoop.configKey\": \"config_key_3\"," + "\"application.loggingLevel\": \"TRACE\"," + + "\"application.ssl.keyStorePath\": \"/opt/app/bbs-event-processor/etc/cert/key.p12\"," + + "\"application.ssl.keyStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/key.pass\"," + + "\"application.ssl.trustStorePath\": \"/opt/app/bbs-event-processor/etc/cert/trust.jks\"," + + "\"application.ssl.trustStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/trust.pass\"," + + "\"application.ssl.enableAaiCertAuth\": true," + + "\"application.ssl.enableDmaapCertAuth\": true," + "\"streams_subscribes\": {" + "\"config_key_1\": {" + "\"type\": \"message_router\"," @@ -203,6 +209,12 @@ class ConsulConfigurationGatewayTest { .cpeAuthConfigKey("config_key_1") .closeLoopConfigKey("config_key_3") .loggingLevel("TRACE") + .keyStorePath("/opt/app/bbs-event-processor/etc/cert/key.p12") + .keyStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/key.pass") + .trustStorePath("/opt/app/bbs-event-processor/etc/cert/trust.jks") + .trustStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/trust.pass") + .enableAaiCertAuth(true) + .enableDmaapCertAuth(true) .streamSubscribesMap(subscribes) .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3)) .build(); diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java index 76d9659c..c4bef9df 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java @@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -// We can safely suppress unchecked assignment warnings for the ResponseEntity mock -@SuppressWarnings("unchecked") @DisplayName("CPE Authentication Pipeline Unit-Tests") class CpeAuthenticationPipelineTest { @@ -82,12 +80,12 @@ class CpeAuthenticationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private ResponseEntity<String> responseEntity; + private HttpResponse httpResponse; @BeforeEach void setup() { - responseEntity = Mockito.mock(ResponseEntity.class); + httpResponse = Mockito.mock(HttpResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class); @@ -268,13 +266,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -341,14 +339,14 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -452,13 +450,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -512,13 +510,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -574,13 +572,13 @@ class CpeAuthenticationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java index a1b6b148..9453db3d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java @@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; import org.onap.bbs.event.processor.tasks.AaiClientTask; import org.onap.bbs.event.processor.tasks.DmaapPublisherTask; import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -// We can safely suppress unchecked assignment warnings for the ResponseEntity mock -@SuppressWarnings("unchecked") @DisplayName("PNF Re-registration Pipeline Unit-Tests") class ReRegistrationPipelineTest { @@ -82,12 +80,12 @@ class ReRegistrationPipelineTest { private DmaapPublisherTask publisherTask; private AaiClientTask aaiClientTask; - private ResponseEntity<String> responseEntity; + private HttpResponse httpResponse; @BeforeEach void setup() { - responseEntity = Mockito.mock(ResponseEntity.class); + httpResponse = Mockito.mock(HttpResponse.class); configuration = Mockito.mock(ApplicationConfiguration.class); consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class); @@ -262,8 +260,8 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) @@ -312,13 +310,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -384,14 +382,14 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -491,13 +489,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2)) .thenReturn(Mono.just(hsiCfsServiceInstance2)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class)); @@ -551,13 +549,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString()); @@ -613,13 +611,13 @@ class ReRegistrationPipelineTest { .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl)) .thenReturn(Mono.just(hsiCfsServiceInstance)); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value())); - when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity)); + when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value()); + when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse)); // Execute the pipeline StepVerifier.create(pipeline.executePipeline()) .expectSubscription() - .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode())) + .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode())) .verifyComplete(); verify(aaiClientTask, times(2)) diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java index 538ff1de..40bcb65d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java @@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.net.ssl.SSLException; import org.junit.Assert; @@ -62,6 +67,7 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel; private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; private static String eventsArray; + private static Gson gson = new Gson(); @BeforeAll static void setUp() throws SSLException { @@ -108,22 +114,25 @@ class DmaapCpeAuthenticationConsumerTaskImplTest { @Test void passingEmptyMessage_NothingHappens() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just("")); + JsonElement empty = gson.toJsonTree(""); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } @Test void passingNormalMessage_ResponseSucceeds() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(eventsArray)); + JsonElement normalEventsArray = gson.toJsonTree(eventsArray); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) + .thenReturn(Mono.just(normalEventsArray)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java index 199a43ec..436206d2 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java @@ -30,6 +30,9 @@ import static org.mockito.Mockito.when; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; + +import javax.net.ssl.SSLException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -39,12 +42,12 @@ import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -112,46 +115,46 @@ class DmaapPublisherTaskImplTest { } @Test - void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException { - ResponseEntity<String> responseEntity = setupMocks(HttpStatus.OK.value()); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK); + void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException { + HttpResponse response = setupMocks(HttpStatus.OK.value()); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(responseEntity).verifyComplete(); + .expectNext(response).verifyComplete(); verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); verifyNoMoreInteractions(reactiveHttpClient); } @Test - void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException { - ResponseEntity<String> responseEntity = setupMocks(HttpStatus.UNAUTHORIZED.value()); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED); + void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException { + HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value()); + StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription() - .expectNext(responseEntity).verifyComplete(); + .expectNext(response).verifyComplete(); verify(reactiveHttpClient, times(1)) - .getDMaaPProducerResponse(controlLoopPublisherDmaapModel); + .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); verifyNoMoreInteractions(reactiveHttpClient); } // We can safely suppress unchecked assignment warning here since it is a mock class @SuppressWarnings("unchecked") - private ResponseEntity<String> setupMocks(Integer httpResponseCode) { + private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException { - ResponseEntity<String> responseEntity = mock(ResponseEntity.class); - when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode)); + HttpResponse response = mock(HttpResponse.class); + when(response.statusCode()).thenReturn(httpResponseCode); reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class); - when(reactiveHttpClient.getDMaaPProducerResponse(any())) - .thenReturn(Mono.just(responseEntity)); + when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class))) + .thenReturn(Mono.just(response)); PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class); doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration); task = new DmaapPublisherTaskImpl(configuration, httpClientFactory); - return responseEntity; + return response; } private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java index c9a461d8..72e28987 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java @@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import java.util.Optional; + import javax.net.ssl.SSLException; import org.junit.Assert; @@ -59,7 +64,8 @@ class DmaapReRegistrationConsumerTaskImplTest { private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask; private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel; private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; - private static String message; + private static String eventsArray; + private static Gson gson = new Gson(); @BeforeAll static void setUp() throws SSLException { @@ -91,12 +97,10 @@ class DmaapReRegistrationConsumerTaskImplTest { .sVlan(svlan) .build(); - message = String.format("[" + RE_REGISTRATION_EVENT_TEMPLATE + "]", - sourceName, - attachmentPoint, - remoteId, - cvlan, - svlan); + String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId, + cvlan, svlan); + + eventsArray = "[" + event + "]"; } @AfterEach @@ -105,23 +109,26 @@ class DmaapReRegistrationConsumerTaskImplTest { } @Test - void passingEmptyMessage_NothingHappens() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just("")); + void passingEmptyMessage_NothingHappens() { + JsonElement empty = gson.toJsonTree(""); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .expectError(EmptyDmaapResponseException.class); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } @Test - void passingNormalMessage_ResponseSucceeds() throws Exception { - when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message)); + void passingNormalMessage_ResponseSucceeds() { + JsonElement normalEventsArray = gson.toJsonTree(eventsArray); + when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())) + .thenReturn(Mono.just(normalEventsArray)); StepVerifier.create(dmaapConsumerTask.execute("Sample input")) .expectSubscription() .consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel)); - verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(); + verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty()); } private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() { diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java index 4ca61f5e..c7ad7935 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; +import java.io.StringReader; import java.util.Optional; import org.junit.jupiter.api.BeforeAll; @@ -100,14 +102,17 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { } @Test - void passingNonJson_EmptyFluxIsReturned() { + void passingNonJson_getIllegalStateException() { CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); + JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + jsonReader.setLenient(true); + JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson))) .expectSubscription() - .verifyComplete(); + .verifyError(IllegalStateException.class); } @Test @@ -116,7 +121,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() .verifyComplete(); } @@ -151,7 +156,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { .swVersion(swVersion) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedEventObject); } @@ -182,7 +187,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + secondEvent + "]"; + String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; CpeAuthenticationConsumerDmaapModel expectedFirstEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder() @@ -203,7 +208,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { .swVersion(swVersion) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedFirstEventObject) .expectNext(expectedSecondEventObject); @@ -229,7 +234,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -254,7 +259,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -279,7 +284,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -304,7 +309,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java index ca448dd0..cd238e2d 100644 --- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java +++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; +import java.io.StringReader; import java.util.Optional; import org.junit.jupiter.api.BeforeAll; @@ -89,21 +91,22 @@ class ReRegistrationDmaapConsumerJsonParserTest { } @Test - void passingNonJson_EmptyFluxIsReturned() { + void passingNonJson_getIllegalStateException() { ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); - - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON"))) + JsonReader jsonReader = new JsonReader(new StringReader("not JSON")); + jsonReader.setLenient(true); + JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive(); + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson))) .expectSubscription() - .verifyComplete(); + .verifyError(IllegalStateException.class); } @Test void passingNoEvents_EmptyFluxIsReturned() { ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser(); - - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]"))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]")))) .expectSubscription() .verifyComplete(); } @@ -135,7 +138,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { .sVlan(svlan) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedEventObject); } @@ -165,7 +168,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject())) .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2); - String eventsArray = "[" + firstEvent + secondEvent + "]"; + String eventsArray = "[" + firstEvent + "," + secondEvent + "]"; ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(correlationId1) @@ -182,7 +185,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { .sVlan(svlan) .build(); - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .expectNext(expectedFirstEventObject) .expectNext(expectedSecondEventObject); @@ -209,7 +212,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -235,7 +238,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -261,7 +264,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } @@ -289,7 +292,7 @@ class ReRegistrationDmaapConsumerJsonParserTest { String eventsArray = "[" + event + "]"; - StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray))) + StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray)))) .expectSubscription() .verifyComplete(); } |