summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java72
1 files changed, 31 insertions, 41 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 283e5ef9..6c50b10d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,48 @@
package org.onap.bbs.event.processor.tasks;
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private ApplicationConfiguration configuration;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- private DMaaPPublisherReactiveHttpClient httpClient;
+ private ApplicationConfiguration configuration;
+ private MessageRouterPublisher publisher;
+ private String publishUrl;
+ private MessageRouterPublishRequest publishRequest;
@Autowired
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
- this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
- new ControlLoopJsonBodyBuilder()));
- }
-
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) {
this.configuration = configuration;
- this.httpClientFactory = httpClientFactory;
-
- try {
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ this.publisher = publisher;
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ this.configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapProducerProperties().getDmaapHostName(),
+ this.configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@PostConstruct
@@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- try {
- LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ publisher =
+ DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ configuration.getDmaapProducerProperties().getDmaapHostName(),
+ configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@Override
- public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
- if (controlLoopPublisherDmaapModel == null) {
+ public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) {
+ if (event == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message");
- LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- }
-
- private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
- return httpClient;
+ LOGGER.debug("CL message \n{}", event);
+ return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event)));
}
}