diff options
Diffstat (limited to 'prh-app-server/src/main')
4 files changed, 32 insertions, 20 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 6db36a8a..8df564d2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -19,10 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Optional; +import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; /** @@ -36,5 +38,15 @@ abstract class DmaapConsumerTask { abstract void initConfigs(); + protected abstract DmaapConsumerConfiguration resolveConfiguration(); + protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; + + WebClient buildWebClient() { + DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration(); + return new DMaaPReactiveWebClient.WebClientBuilder() + .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType()) + .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName()) + .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build(); + } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 90382e51..8c74bac3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient; import org.slf4j.Logger; @@ -73,21 +72,15 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { prhAppConfig.initFileStreamReader(); } + @Override protected DmaapConsumerConfiguration resolveConfiguration() { return prhAppConfig.getDmaapConsumerConfiguration(); } @Override DMaaPConsumerReactiveHttpClient resolveClient() { - return Optional.ofNullable(dMaaPConsumerReactiveHttpClient) - .orElseGet(() -> { - DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration(); - return new DMaaPConsumerReactiveHttpClient(dmaapConsumerConfiguration).createDMaaPWebClient( - new DMaaPReactiveWebClient.WebClientBuilder() - .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType()) - .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName()) - .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build()); - }); + .orElseGet(() -> new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient( + buildWebClient())); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 27670e29..af8d14a3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -19,9 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; /** @@ -33,5 +36,15 @@ abstract class DmaapPublisherTask { abstract DMaaPProducerReactiveHttpClient resolveClient(); + protected abstract DmaapPublisherConfiguration resolveConfiguration(); + protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + + WebClient buildWebClient() { + DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration(); + return new DMaaPReactiveWebClient.WebClientBuilder() + .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType()) + .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName()) + .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build(); + } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index faf43bc7..11281d81 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -25,7 +25,6 @@ import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient; import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +63,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { return publish(consumerDmaapModel); } + @Override protected DmaapPublisherConfiguration resolveConfiguration() { return prhAppConfig.getDmaapPublisherConfiguration(); } @@ -71,13 +71,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { @Override DMaaPProducerReactiveHttpClient resolveClient() { return Optional.ofNullable(dMaaPProducerReactiveHttpClient) - .orElseGet(() -> { - DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration(); - return new DMaaPProducerReactiveHttpClient(dmaapPublisherConfiguration).createDMaaPWebClient( - new DMaaPReactiveWebClient.WebClientBuilder() - .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType()) - .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName()) - .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build()); - }); + .orElseGet(() -> new DMaaPProducerReactiveHttpClient(resolveConfiguration()) + .createDMaaPWebClient(buildWebClient())); } }
\ No newline at end of file |