aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java42
1 files changed, 10 insertions, 32 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index fd7bca1e..a990e502 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,19 +20,15 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
@@ -41,39 +37,21 @@ import reactor.core.publisher.Flux;
@Component
public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
- @Autowired
- public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
- DmaapConsumerTaskImpl(Config prhAppConfig,
- DmaapConsumerJsonParser dmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ public DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
}
@Override
- public void initConfigs() {
- config.initFileStreamReader();
+ public Flux<ConsumerDmaapModel> execute() {
+ MessageRouterSubscriber messageRouterSubscriber = config.getMessageRouterSubscriber();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = config.getMessageRouterSubscribeRequest();
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriber.get(messageRouterSubscribeRequest);
+ return dmaapConsumerJsonParser.getJsonObject(response);
}
- @Override
- public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
- LOGGER.debug("Method called with arg {}", object);
- return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
- }
-
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.getDmaapConsumerConfiguration());
- }
}