diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 564a7a41..3181c069 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; @@ -41,6 +40,7 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; @@ -59,18 +59,17 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override - Mono<ConsumerDmaapModel> consume(Mono<String> message) { + Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException { logger.info("Consumed model from DmaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } - @Override - public Mono<ConsumerDmaapModel> execute(String object) { + public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException { dmaapConsumerReactiveHttpClient = resolveClient(); // dmaapConsumerReactiveHttpClient.initWebClient(); logger.trace("Method called with arg {}", object); - return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne())); + return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse())); } @Override |