From 3d1f84c127e2244a42d5e02d4c50f8e9f06000d1 Mon Sep 17 00:00:00 2001 From: wasala Date: Wed, 4 Jul 2018 14:12:23 +0200 Subject: Added reactive DMaaPClient Extracted WebCLientBuilder for Producer and Consumer. Added unit test for ReactiveProducerClient. Change-Id: I632e6928813ed9feb48982900c173f741e4483e3 Issue-ID: DCAEGEN2-563 Signed-off-by: wasala --- .../services/prh/tasks/DmaapConsumerTaskImpl.java | 28 +++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 08008f0a..90382e51 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -23,11 +23,10 @@ import java.util.Optional; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; -import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,7 +42,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; + private DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(AppConfig prhAppConfig) { @@ -58,16 +57,15 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override Mono consume(Mono message) { - logger.info("Consumed model from DmaaP: {}", message); + logger.info("Consumed model from DMaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } @Override public Mono execute(String object) { - dmaapConsumerReactiveHttpClient = resolveClient(); - dmaapConsumerReactiveHttpClient.initWebClient(); + dMaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); - return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse())); + return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @Override @@ -80,8 +78,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } @Override - DmaapConsumerReactiveHttpClient resolveClient() { - return Optional.ofNullable(dmaapConsumerReactiveHttpClient) - .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration())); + DMaaPConsumerReactiveHttpClient resolveClient() { + + return Optional.ofNullable(dMaaPConsumerReactiveHttpClient) + .orElseGet(() -> { + DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration(); + return new DMaaPConsumerReactiveHttpClient(dmaapConsumerConfiguration).createDMaaPWebClient( + new DMaaPReactiveWebClient.WebClientBuilder() + .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType()) + .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName()) + .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build()); + }); } } -- cgit 1.2.3-korg