aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java50
1 files changed, 21 insertions, 29 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 63e01c12..2890d195 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,46 @@
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.JsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-
+import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+
+import java.util.function.Supplier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Config config;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- @Autowired
- public DmaapPublisherTaskImpl(Config config) {
- this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new JsonBodyBuilderImpl()));
- }
+ private final Supplier<MessageRouterPublishRequest> publishRequestSupplier;
+ private final Supplier<MessageRouterPublisher> publisherSupplier;
+ private final PnfReadyJsonBodyBuilder pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilder();
- DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
- this.config = config;
- this.httpClientFactory = httpClientFactory;
+
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> publishRequestSupplier,
+ Supplier<MessageRouterPublisher> publisherSupplier) {
+ this.publishRequestSupplier = publishRequestSupplier;
+ this.publisherSupplier = publisherSupplier;
}
@Override
- public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
+ public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
- }
-
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() {
- return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+ MessageRouterPublisher messageRouterPublisher = publisherSupplier.get();
+ MessageRouterPublishRequest messageRouterPublishRequest = publishRequestSupplier.get();
+ return messageRouterPublisher.put(
+ messageRouterPublishRequest,
+ Flux.just(pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel)));
}
} \ No newline at end of file