diff options
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java')
-rw-r--r-- | components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java | 72 |
1 files changed, 31 insertions, 41 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 283e5ef9..6c50b10d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,54 +20,48 @@ package org.onap.bbs.event.processor.tasks; -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; @Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private ApplicationConfiguration configuration; - private final PublisherReactiveHttpClientFactory httpClientFactory; - private DMaaPPublisherReactiveHttpClient httpClient; + private ApplicationConfiguration configuration; + private MessageRouterPublisher publisher; + private String publishUrl; + private MessageRouterPublishRequest publishRequest; @Autowired - DmaapPublisherTaskImpl(ApplicationConfiguration configuration) { - this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), - new ControlLoopJsonBodyBuilder())); - } - - DmaapPublisherTaskImpl(ApplicationConfiguration configuration, - PublisherReactiveHttpClientFactory httpClientFactory) { + DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) { this.configuration = configuration; - this.httpClientFactory = httpClientFactory; - - try { - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + this.publisher = publisher; + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + this.configuration.getDmaapProducerProperties().getDmaapProtocol(), + this.configuration.getDmaapProducerProperties().getDmaapHostName(), + this.configuration.getDmaapProducerProperties().getDmaapPortNumber(), + this.configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @PostConstruct @@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - try { - LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + publisher = + DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + configuration.getDmaapProducerProperties().getDmaapProtocol(), + configuration.getDmaapProducerProperties().getDmaapHostName(), + configuration.getDmaapProducerProperties().getDmaapPortNumber(), + configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @Override - public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { - if (controlLoopPublisherDmaapModel == null) { + public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) { + if (event == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } LOGGER.info("Executing task for publishing control loop message"); - LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); - DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - } - - private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { - return httpClient; + LOGGER.debug("CL message \n{}", event); + return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event))); } } |