diff options
2 files changed, 21 insertions, 12 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index aed99747..bf3987cb 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,21 +23,25 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; -import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ public class DmaapConsumerJsonParser { + private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); + private static final String EVENT = "event"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields"; @@ -59,7 +63,7 @@ public class DmaapConsumerJsonParser { } private Mono<JsonElement> getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) + return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty") : Mono.fromCallable(() -> new JsonParser().parse(message)); } @@ -84,7 +88,7 @@ public class DmaapConsumerJsonParser { private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") : transform(monoJsonP)) .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); } @@ -101,8 +105,8 @@ public class DmaapConsumerJsonParser { String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address)) - ? Mono.error(new DmaapNotFoundException("Incorrect json, consumerDmaapModel can not be created: " - + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address))) : + ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " + + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) : Mono.just(ImmutableConsumerDmaapModel.builder() .correlationId(pnfSourceName) .ipv4(pnfOamIpv4Address) @@ -128,4 +132,9 @@ public class DmaapConsumerJsonParser { + "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\"" + "%n}", sourceName, oamIpv4Address, oamIpv6Address); } + + private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) { + LOGGER.warn(messageForLogger); + return Mono.empty(); + } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java index 689a732c..c17e2543 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java @@ -109,25 +109,25 @@ class DmaapConsumerTaskImplTest { //given prepareMocksForDmaapConsumer(Optional.empty()); - //then - StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() - .expectError(DmaapEmptyResponseException.class).verify(); + //when + Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input"); + //then verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + assertEquals(null, response.blockFirst()); } @Test void whenPassedObjectFits_ReturnsCorrectResponse() { //given prepareMocksForDmaapConsumer(Optional.of(message)); + //when Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input"); //then verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); assertEquals(consumerDmaapModel, response.blockFirst()); - - } private void prepareMocksForDmaapConsumer(Optional<String> message) { |