aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java23
1 files changed, 7 insertions, 16 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index f46e2cc9..af5b2505 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -24,10 +24,8 @@ import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -39,28 +37,21 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- @Autowired
- public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser());
- }
-
- DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ public DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) {
- MessageRouterSubscriber messageRouterSubscriberClient =
- new MessageRouterSubscriberResolver().resolveClient();
- LOGGER.debug("Method called with arg {}", object);
- Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
- .get(config.getMessageRouterSubscribeRequest());
+ public Flux<ConsumerDmaapModel> execute() {
+ MessageRouterSubscriber messageRouterSubscriber = config.getMessageRouterSubscriber();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = config.getMessageRouterSubscribeRequest();
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriber
+ .get(messageRouterSubscribeRequest);
return dmaapConsumerJsonParser.getJsonObject(response);
}