aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java46
1 files changed, 27 insertions, 19 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 081c7f39..9c33484d 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -20,16 +20,19 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -41,19 +44,20 @@ import reactor.core.publisher.Mono;
*/
public class DMaaPMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
-
+ private final AppConfig datafileAppConfig;
private final JsonMessageParser jsonMessageParser;
- private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+ private final ConsumerReactiveHttpClientFactory httpClientFactory;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
- this.jsonMessageParser = new JsonMessageParser();
- this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ this(datafileAppConfig, new JsonMessageParser(),
+ new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
- protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
- JsonMessageParser messageParser) {
- this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
- this.jsonMessageParser = messageParser;
+ protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
+ ConsumerReactiveHttpClientFactory httpClientFactory) {
+ this.datafileAppConfig = datafileAppConfig;
+ this.jsonMessageParser = jsonMessageParser;
+ this.httpClientFactory = httpClientFactory;
}
/**
@@ -63,19 +67,23 @@ public class DMaaPMessageConsumer {
*/
public Flux<FileReadyMessage> getMessageRouterResponse() {
logger.trace("getMessageRouterResponse called");
- return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
+ try {
+ DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient();
+ return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())));
+ } catch (DatafileTaskException e) {
+ logger.warn("Unable to get response from message router", e);
+ return Flux.empty();
+ }
}
- private Flux<FileReadyMessage> consume(Mono<String> message) {
+ private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
logger.trace("consume called with arg {}", message);
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
- throws DatafileTaskException {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
- WebClient client = new DmaapWebClient().fromConfiguration(config).build();
- return new DMaaPConsumerReactiveHttpClient(config, client);
+ public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
+
+ return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
}
}