diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java | 39 |
1 files changed, 14 insertions, 25 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 066983ae..0780e18e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +22,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; - -import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Component used to get messages from the MessageRouter. @@ -46,18 +40,14 @@ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; public DMaaPMessageConsumer(AppConfig datafileAppConfig) { - this(datafileAppConfig, new JsonMessageParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(datafileAppConfig, new JsonMessageParser()); } - protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) { this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = jsonMessageParser; - this.httpClientFactory = httpClientFactory; } /** @@ -68,21 +58,20 @@ public class DMaaPMessageConsumer { public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); try { - DMaaPConsumerReactiveHttpClient client = createHttpClient(); - return consume((client.getDMaaPConsumerResponse(Optional.empty()))); - } catch (DatafileTaskException e) { + ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); + MessageRouterSubscriber messageRouterSubscriber = + dmaapConsumerConfiguration.getMessageRouterSubscriber(); + Flux<JsonElement> responseElements = + messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + return consume(responseElements); + } catch (Exception e) { logger.warn("Unable to get response from message router", e); return Flux.empty(); } } - private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { - logger.trace("consume called with arg {}", message); - return jsonMessageParser.getMessagesFromJson(message); - } - - public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { - return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); + private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) { + return jsonMessageParser.getMessagesFromJson(messages); } } |