aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java31
1 files changed, 10 insertions, 21 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index d3086cbe..f46e2cc9 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,15 +20,11 @@
package org.onap.dcaegen2.services.prh.tasks;
-import com.google.gson.JsonElement;
-import java.util.Optional;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+
@Autowired
public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+ this(config, new DmaapConsumerJsonParser());
}
- DmaapConsumerTaskImpl(Config prhAppConfig,
- DmaapConsumerJsonParser dmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ public Flux<ConsumerDmaapModel> execute(String object) {
+ MessageRouterSubscriber messageRouterSubscriberClient =
+ new MessageRouterSubscriberResolver().resolveClient();
LOGGER.debug("Method called with arg {}", object);
- Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(
- Optional.empty());
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
+ .get(config.getMessageRouterSubscribeRequest());
return dmaapConsumerJsonParser.getJsonObject(response);
}
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.getDmaapConsumerConfiguration());
- }
}