diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 081c7f39..9c33484d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -20,16 +20,19 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import com.google.gson.JsonElement; + +import java.util.Optional; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,19 +44,20 @@ import reactor.core.publisher.Mono; */ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); - + private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final ConsumerReactiveHttpClientFactory httpClientFactory; - public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException { - this.jsonMessageParser = new JsonMessageParser(); - this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); + public DMaaPMessageConsumer(AppConfig datafileAppConfig) { + this(datafileAppConfig, new JsonMessageParser(), + new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); } - protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - JsonMessageParser messageParser) { - this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; - this.jsonMessageParser = messageParser; + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, + ConsumerReactiveHttpClientFactory httpClientFactory) { + this.datafileAppConfig = datafileAppConfig; + this.jsonMessageParser = jsonMessageParser; + this.httpClientFactory = httpClientFactory; } /** @@ -63,19 +67,23 @@ public class DMaaPMessageConsumer { */ public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); - return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); + try { + DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient(); + return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))); + } catch (DatafileTaskException e) { + logger.warn("Unable to get response from message router", e); + return Flux.empty(); + } } - private Flux<FileReadyMessage> consume(Mono<String> message) { + private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { logger.trace("consume called with arg {}", message); return jsonMessageParser.getMessagesFromJson(message); } - private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) - throws DatafileTaskException { - DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap(); - WebClient client = new DmaapWebClient().fromConfiguration(config).build(); - return new DMaaPConsumerReactiveHttpClient(config, client); + public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { + + return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); } } |