diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-07-04 14:12:23 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-07 09:45:22 +0200 |
commit | 3d1f84c127e2244a42d5e02d4c50f8e9f06000d1 (patch) | |
tree | a9a661e395404c2d0f8203041a7ab6c5f54acd50 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java | |
parent | 79984d737c71d3c92f3cd283eaf2b9b6157c2ce2 (diff) |
Added reactive DMaaPClient
Extracted WebCLientBuilder for
Producer and Consumer.
Added unit test for ReactiveProducerClient.
Change-Id: I632e6928813ed9feb48982900c173f741e4483e3
Issue-ID: DCAEGEN2-563
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 08008f0a..90382e51 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -23,11 +23,10 @@ import java.util.Optional; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; -import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,7 +42,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; - private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; + private DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient; @Autowired public DmaapConsumerTaskImpl(AppConfig prhAppConfig) { @@ -58,16 +57,15 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override Mono<ConsumerDmaapModel> consume(Mono<String> message) { - logger.info("Consumed model from DmaaP: {}", message); + logger.info("Consumed model from DMaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } @Override public Mono<ConsumerDmaapModel> execute(String object) { - dmaapConsumerReactiveHttpClient = resolveClient(); - dmaapConsumerReactiveHttpClient.initWebClient(); + dMaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); - return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse())); + return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @Override @@ -80,8 +78,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } @Override - DmaapConsumerReactiveHttpClient resolveClient() { - return Optional.ofNullable(dmaapConsumerReactiveHttpClient) - .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration())); + DMaaPConsumerReactiveHttpClient resolveClient() { + + return Optional.ofNullable(dMaaPConsumerReactiveHttpClient) + .orElseGet(() -> { + DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration(); + return new DMaaPConsumerReactiveHttpClient(dmaapConsumerConfiguration).createDMaaPWebClient( + new DMaaPReactiveWebClient.WebClientBuilder() + .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType()) + .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName()) + .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build()); + }); } } |