From a122d0a0a7075163fad4865143fedf7b6fe511d1 Mon Sep 17 00:00:00 2001 From: wasala Date: Tue, 25 Sep 2018 12:24:48 +0200 Subject: PRH DMaaP objects batching *Getting collection of object in one request *Refator the workflow in the old implementation Change-Id: I4fdbf4bd8ae70cd78dbf5c3c441ba01c28e6ce4f Issue-ID: DCAEGEN2-834 Signed-off-by: wasala --- pom.xml | 2 +- .../producer/AaiProducerReactiveHttpClient.java | 5 +- .../prh/service/DmaapConsumerJsonParser.java | 28 +- .../services/prh/tasks/DmaapConsumerTask.java | 5 +- .../services/prh/tasks/DmaapConsumerTaskImpl.java | 5 +- .../services/prh/tasks/ScheduledTasks.java | 16 +- .../prh/service/DmaapConsumerJsonParserTest.java | 482 +++++++++++---------- .../prh/tasks/DmaapConsumerTaskImplTest.java | 61 +-- 8 files changed, 333 insertions(+), 271 deletions(-) diff --git a/pom.xml b/pom.xml index f563de5b..1749eaf2 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ org.onap.dcaegen2.services prh - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT dcaegen2-services-prh PNF Registration Handler diff --git a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java index 665d65a3..43f6ce91 100644 --- a/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java +++ b/prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java @@ -27,7 +27,6 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.X_ONAP_R import java.net.URI; import java.util.UUID; - import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.MDC; @@ -37,8 +36,6 @@ import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; - - public class AaiProducerReactiveHttpClient { private WebClient webClient; @@ -90,4 +87,4 @@ public class AaiProducerReactiveHttpClient { return new DefaultUriBuilderFactory().builder().scheme(aaiProtocol).host(aaiHost).port(aaiHostPortNumber) .path(aaiBasePath + aaiPnfPath + "/" + pnfName).build(); } -} +} \ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 1d121b38..aed99747 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Mono getJsonObject(Mono monoMessage) { + public Flux getJsonObject(Mono monoMessage) { return monoMessage - .flatMap(this::getJsonParserMessage) + .flatMapMany(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } @@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser { : Mono.fromCallable(() -> new JsonParser().parse(message)); } - private Mono createJsonConsumerModel(JsonElement jsonElement) { + private Flux createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) + ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) : getConsumerDmaapModelFromJsonArray(jsonElement); } - private Mono getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + private Flux getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { return create( - Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new))); + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } public Optional getJsonObjectFromAnArray(JsonElement element) { - return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); + JsonParser jsonParser = new JsonParser(); + return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } - private Mono create(Mono jsonObject) { + private Flux create(Flux jsonObject) { return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)) + .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); } private Mono transform(JsonObject responseFromDmaap) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index a6baf4a5..4cde2257 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -32,7 +33,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono consume(Mono message); + abstract Flux consume(Mono message); abstract DMaaPConsumerReactiveHttpClient resolveClient(); @@ -40,7 +41,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono execute(String object); + protected abstract Flux execute(String object); WebClient buildWebClient() { return new DMaaPReactiveWebClient().build(); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 341a229b..3a5f213c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -54,12 +55,12 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } @Override - Mono consume(Mono message) { + Flux consume(Mono message) { return dmaapConsumerJsonParser.getJsonObject(message); } @Override - public Mono execute(String object) { + public Flux execute(String object) { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index de7837ec..08767428 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; @@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -83,7 +85,13 @@ public class ScheduledTasks { logger.warn("Nothing to consume from DMaaP") ) .flatMap(this::publishToAaiConfiguration) + .doOnError(exception -> + logger.warn("AAIProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .flatMap(this::publishToDmaapConfiguration) + .doOnError(exception -> + logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .doOnTerminate(mainCountDownLatch::countDown) .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -113,8 +121,8 @@ public class ScheduledTasks { } - private Mono consumeFromDMaaPMessage() { - return Mono.defer(() -> { + private Flux consumeFromDMaaPMessage() { + return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); logger.info(INVOKE, "Init configs"); @@ -138,4 +146,8 @@ public class ScheduledTasks { return Mono.error(e); } } + + private Predicate resumePrhPredicate() { + return exception -> exception instanceof PrhTaskException; + } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java index 225d46ee..01ce7419 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java @@ -28,7 +28,6 @@ import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import reactor.core.publisher.Mono; @@ -43,36 +42,36 @@ class DmaapConsumerJsonParserTest { void whenPassingCorrectJson_validationNotThrowingAnException() { //given String message = "[{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}]"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}]"; String parsed = "{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}"; ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234") .ipv6("0:0:0:0:0:FFFF:0A10:7BEA") @@ -83,7 +82,7 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser - .getJsonObject(Mono.just((message))).block(); + .getJsonObject(Mono.just((message))).blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); @@ -93,34 +92,34 @@ class DmaapConsumerJsonParserTest { void whenPassingCorrectJsonWithoutIpv4_validationNotThrowingAnException() { //given String message = "[{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}]"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}]"; String parsed = "{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}"; //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); @@ -129,7 +128,7 @@ class DmaapConsumerJsonParserTest { .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); dmaapConsumerJsonParser.getJsonObject(Mono.just((message))); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message))) - .block(); + .blockFirst(); //then ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("") .ipv6("0:0:0:0:0:FFFF:0A10:7BEA") @@ -142,34 +141,34 @@ class DmaapConsumerJsonParserTest { void whenPassingCorrectJsonWithoutIpv6_validationNotThrowingAnException() { //given String message = "[{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"" - + "}}}]"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}]"; String parsed = "{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"" - + "}}}"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}"; ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234").ipv6("") .correlationId("NOKQTFCOC540002E").build(); @@ -179,55 +178,112 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message))) - .block(); + .blockFirst(); //then Assertions.assertNotNull(consumerDmaapModel); Assertions.assertEquals(expectedObject, consumerDmaapModel); } @Test - void whenPassingCorrectJsonWithoutIpv4andIpv6_validationThrowingAnException() { + void whenPassingCorrectJsonWithoutIpv4andIpv6_validationAddingAnException() { //given String message = "[{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"" - + "}}}]"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}]"; String parsed = "{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"" - + "}}}"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}"; DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = new JsonParser().parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + .expectSubscription().thenRequest(1).verifyComplete(); } @Test - void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() { + void whenPassingJsonWithoutMandatoryHeaderInformation_validationAddingAnException() { String parsed = "{\"event\": {" + + "\"commonEventHeader\": {}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}"; + + DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonElement jsonElement = new JsonParser().parse(parsed); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); + String incorrectMessage = "[{\"event\": {" + + "\"commonEventHeader\": {}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"" + + "}}}]"; + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage))) + .expectSubscription().thenRequest(1).verifyComplete(); + } + + @Test + void whenPassingJsonWithoutSourceName_validationAddingAnException() { + String parsed = "{\"event\": {" + + "\"commonEventHeader\": {}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}"; + + DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); + JsonElement jsonElement = new JsonParser().parse(parsed); + Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); + String jsonWithoutSourceName = + "[{\"event\": {" + "\"commonEventHeader\": {}," + "\"pnfRegistrationFields\": {" + " \"unitType\": \"AirScale\"," @@ -238,15 +294,41 @@ class DmaapConsumerJsonParserTest { + " \"lastServiceDate\": \"1535014037024\"," + " \"unitFamily\": \"BBU\"," + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"" + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}]"; + StepVerifier + .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceName))) + .expectSubscription().thenRequest(1) + .verifyComplete(); + } + + @Test + void whenPassingJsonWithoutIpInformation_validationAddingAnException() { + String parsed = + "{\"event\": {" + + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV4IpAddress\": \"\"," + + " \"oamV6IpAddress\": \"\"" + "}}}"; DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = new JsonParser().parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String incorrectMessage = "[{\"event\": {" - + "\"commonEventHeader\": {}," + String jsonWithoutIpInformation = + "[{\"event\": {" + + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"}," + "\"pnfRegistrationFields\": {" + " \"unitType\": \"AirScale\"," + " \"serialNumber\": \"QTFCOC540002E\"," @@ -256,16 +338,19 @@ class DmaapConsumerJsonParserTest { + " \"lastServiceDate\": \"1535014037024\"," + " \"unitFamily\": \"BBU\"," + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"" + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV4IpAddress\": \"\"," + + " \"oamV6IpAddress\": \"\"" + "}}}]"; - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation))) + .expectSubscription().thenRequest(1).verifyComplete(); } @Test - void whenPassingJsonWithoutSourceName_validationThrowingAnException() { - String parsed = "{\"event\": {" - + "\"commonEventHeader\": {}," + void whenPassingJsonWithoutSourceNameValue_validationAddingAnException() { + String parsed = + "{\"event\": {" + + "\"commonEventHeader\": {\"sourceName\": \"\"}," + "\"pnfRegistrationFields\": {" + " \"unitType\": \"AirScale\"," + " \"serialNumber\": \"QTFCOC540002E\"," @@ -276,116 +361,77 @@ class DmaapConsumerJsonParserTest { + " \"unitFamily\": \"BBU\"," + " \"vendorName\": \"Nokia\"," + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + "}}}"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) - .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String jsonWithoutSourceName = - "[{\"event\": {" - + "\"commonEventHeader\": {}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}]"; - StepVerifier - .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceName))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); - } - - @Test - void whenPassingJsonWithoutIpInformation_validationThrowingAnException() { - String parsed = - "{\"event\": {" - + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV4IpAddress\": \"\"," - + " \"oamV6IpAddress\": \"\"" - + "}}}"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = new JsonParser().parse(parsed); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); String jsonWithoutIpInformation = - "[{\"event\": {" - + "\"commonEventHeader\": {\"sourceName\": \"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV4IpAddress\": \"\"," - + " \"oamV6IpAddress\": \"\"" - + "}}}]"; + "[{\"event\": {" + + "\"commonEventHeader\": {\"sourceName\": \"\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}]"; StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + .expectSubscription().thenRequest(1).verifyComplete(); } @Test - void whenPassingJsonWithoutSourceNameValue_validationThrowingAnException() { - String parsed = - "{\"event\": {" - + "\"commonEventHeader\": {\"sourceName\": \"\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}"; + void whenPassingCorrectJsoArraynWithoutIpv4_validationNotThrowingAnException() { + //given + String message = "[{\"event\": {" + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}," + + "{\"event\": {" + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}" + + "]"; - DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); - JsonElement jsonElement = new JsonParser().parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) - .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); - String jsonWithoutIpInformation = - "[{\"event\": {" - + "\"commonEventHeader\": {\"sourceName\": \"\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}]"; - StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformation))) - .expectSubscription().expectError(DmaapNotFoundException.class).verify(); + ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder().ipv4("10.16.123.234") + .ipv6("0:0:0:0:0:FFFF:0A10:7BEA") + .correlationId("NOKQTFCOC540002E").build(); + //when + DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); + + //then + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message))) + .expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete(); } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java index c128fb95..689a732c 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java @@ -42,6 +42,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -71,36 +72,36 @@ class DmaapConsumerTaskImplTest { appConfig = mock(AppConfig.class); message = "[{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}]"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}]"; parsed = "{\"event\": {" - + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," - + "\"pnfRegistrationFields\": {" - + " \"unitType\": \"AirScale\"," - + " \"serialNumber\": \"QTFCOC540002E\"," - + " \"pnfRegistrationFieldsVersion\": \"2.0\"," - + " \"manufactureDate\": \"1535014037024\"," - + " \"modelNumber\": \"7BEA\",\n" - + " \"lastServiceDate\": \"1535014037024\"," - + " \"unitFamily\": \"BBU\"," - + " \"vendorName\": \"Nokia\"," - + " \"oamV4IpAddress\": \"10.16.123.234\"," - + " \"softwareVersion\": \"v4.5.0.1\"," - + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" - + "}}}"; + + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"}," + + "\"pnfRegistrationFields\": {" + + " \"unitType\": \"AirScale\"," + + " \"serialNumber\": \"QTFCOC540002E\"," + + " \"pnfRegistrationFieldsVersion\": \"2.0\"," + + " \"manufactureDate\": \"1535014037024\"," + + " \"modelNumber\": \"7BEA\",\n" + + " \"lastServiceDate\": \"1535014037024\"," + + " \"unitFamily\": \"BBU\"," + + " \"vendorName\": \"Nokia\"," + + " \"oamV4IpAddress\": \"10.16.123.234\"," + + " \"softwareVersion\": \"v4.5.0.1\"," + + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\"" + + "}}}"; } @Test @@ -120,11 +121,11 @@ class DmaapConsumerTaskImplTest { //given prepareMocksForDmaapConsumer(Optional.of(message)); //when - Mono response = dmaapConsumerTask.execute("Sample input"); + Flux response = dmaapConsumerTask.execute("Sample input"); //then verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - assertEquals(consumerDmaapModel, response.block()); + assertEquals(consumerDmaapModel, response.blockFirst()); } -- cgit 1.2.3-korg