aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java9
1 files changed, 6 insertions, 3 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index e50ef580..f1d33454 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -29,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -43,7 +45,7 @@ public class DMaaPMessageConsumer {
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
@@ -69,8 +71,9 @@ public class DMaaPMessageConsumer {
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}