diff options
Diffstat (limited to 'prh-app-server/src/main')
4 files changed, 20 insertions, 30 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 2f947d47..58b29d2e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -34,7 +33,5 @@ interface DmaapConsumerTask { Flux<ConsumerDmaapModel> execute(String object); - Flux<ConsumerDmaapModel> consume(Mono<String> message); - DMaaPConsumerReactiveHttpClient resolveClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index c4d9c44a..a52163b7 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -22,15 +22,15 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.services.prh.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.prh.service.consumer.DMaaPReactiveWebClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -41,18 +41,20 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private final DmaapConsumerJsonParser dmaapConsumerJsonParser; - private final DMaaPReactiveWebClient dmaapReactiveWebClient; + private final ConsumerReactiveHttpClientFactory httpClientFactory; @Autowired public DmaapConsumerTaskImpl(Config config) { - this(config, new DmaapConsumerJsonParser(), new DMaaPReactiveWebClient()); + this(config, new DmaapConsumerJsonParser(), + new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClient())); } - DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser, - DMaaPReactiveWebClient dmaapReactiveWebClient) { + DmaapConsumerTaskImpl(Config prhAppConfig, + DmaapConsumerJsonParser dmaapConsumerJsonParser, + ConsumerReactiveHttpClientFactory httpClientFactory) { this.config = prhAppConfig; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.dmaapReactiveWebClient = dmaapReactiveWebClient; + this.httpClientFactory = httpClientFactory; } @Override @@ -64,17 +66,11 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { public Flux<ConsumerDmaapModel> execute(String object) { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); - return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); - } - - @Override - public Flux<ConsumerDmaapModel> consume(Mono<String> message) { - return dmaapConsumerJsonParser.getJsonObject(message); + return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); } @Override public DMaaPConsumerReactiveHttpClient resolveClient() { - return new DMaaPConsumerReactiveHttpClient( - config.getDmaapConsumerConfiguration()).createDMaaPWebClient(dmaapReactiveWebClient.build()); + return httpClientFactory.create(config.getDmaapConsumerConfiguration()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index dc0a4488..3f59815b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -33,7 +33,5 @@ interface DmaapPublisherTask { Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - DMaaPPublisherReactiveHttpClient resolveClient(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index fdc5e625..76c1bb56 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -24,12 +24,12 @@ import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient; +import org.onap.dcaegen2.services.prh.service.producer.PublisherReactiveHttpClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Mono; /** @@ -40,11 +40,16 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); private final Config config; - private DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient; + private final PublisherReactiveHttpClientFactory httpClientFactory; @Autowired public DmaapPublisherTaskImpl(Config config) { + this(config, new PublisherReactiveHttpClientFactory()); + } + + DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { this.config = config; + this.httpClientFactory = httpClientFactory; } @Override @@ -52,19 +57,13 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - dmaapPublisherReactiveHttpClient = resolveClient(); + DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return publish(consumerDmaapModel); - } - - @Override - public Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) { return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override public DMaaPPublisherReactiveHttpClient resolveClient() { - return new DMaaPPublisherReactiveHttpClient(config.getDmaapPublisherConfiguration()) - .createDMaaPWebClient(new RestTemplate()); + return httpClientFactory.create(config.getDmaapPublisherConfiguration()); } }
\ No newline at end of file |