diff options
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks')
4 files changed, 114 insertions, 127 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java index da510281..153cb91b 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java @@ -20,59 +20,61 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Component -public class DmaapCpeAuthenticationConsumerTaskImpl - implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver { +public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask, + ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = - new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); - - private DMaaPConsumerReactiveHttpClient httpClient; + new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP"); @Autowired - public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration, - CpeAuthenticationDmaapConsumerJsonParser - cpeAuthenticationDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException { + @Qualifier("CpeAuthMessageRouterSubscriber") + MessageRouterSubscriber subscriber, + CpeAuthenticationDmaapConsumerJsonParser parser) { + this.cpeAuthenticationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +89,25 @@ public class DmaapCpeAuthenticationConsumerTaskImpl @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()); } @Override public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest) + .flatMap(jsonElement -> + cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +115,4 @@ public class DmaapCpeAuthenticationConsumerTaskImpl } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java index 749c4e53..dec1dbcd 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java @@ -21,11 +21,11 @@ package org.onap.bbs.event.processor.tasks; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; public interface DmaapPublisherTask { - Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); + Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java index 283e5ef9..6c50b10d 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java @@ -20,54 +20,48 @@ package org.onap.bbs.event.processor.tasks; -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.DmaapException; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; @Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private ApplicationConfiguration configuration; - private final PublisherReactiveHttpClientFactory httpClientFactory; - private DMaaPPublisherReactiveHttpClient httpClient; + private ApplicationConfiguration configuration; + private MessageRouterPublisher publisher; + private String publishUrl; + private MessageRouterPublishRequest publishRequest; @Autowired - DmaapPublisherTaskImpl(ApplicationConfiguration configuration) { - this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), - new ControlLoopJsonBodyBuilder())); - } - - DmaapPublisherTaskImpl(ApplicationConfiguration configuration, - PublisherReactiveHttpClientFactory httpClientFactory) { + DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) { this.configuration = configuration; - this.httpClientFactory = httpClientFactory; - - try { - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + this.publisher = publisher; + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + this.configuration.getDmaapProducerProperties().getDmaapProtocol(), + this.configuration.getDmaapProducerProperties().getDmaapHostName(), + this.configuration.getDmaapProducerProperties().getDmaapPortNumber(), + this.configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @PostConstruct @@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration @Override public synchronized void updateConfiguration() { LOGGER.info("DMaaP Publisher update due to new application configuration"); - try { - LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage()); - LOGGER.debug("SSL exception\n", e); - } + publisher = + DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration()); + publishUrl = String.format(PUBLISH_URL_TEMPLATE, + configuration.getDmaapProducerProperties().getDmaapProtocol(), + configuration.getDmaapProducerProperties().getDmaapHostName(), + configuration.getDmaapProducerProperties().getDmaapPortNumber(), + configuration.getDmaapProducerProperties().getDmaapTopicName()); + publishRequest = createPublishRequest(publishUrl); } @Override - public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) { - if (controlLoopPublisherDmaapModel == null) { + public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) { + if (event == null) { throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message"); } LOGGER.info("Executing task for publishing control loop message"); - LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel); - DMaaPPublisherReactiveHttpClient httpClient = getHttpClient(); - return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty()); - } - - private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() { - return httpClient; + LOGGER.debug("CL message \n{}", event); + return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event))); } } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java index e40037b1..aff563c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java @@ -20,25 +20,23 @@ package org.onap.bbs.event.processor.tasks; -import com.google.gson.JsonElement; - -import java.util.Optional; +import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE; +import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.ApplicationConfiguration; import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @@ -49,30 +47,33 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC ConfigurationChangeObserver { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class); - private ApplicationConfiguration configuration; + private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; + private ApplicationConfiguration configuration; + private MessageRouterSubscriber subscriber; + private String subscribeUrl; + private MessageRouterSubscribeRequest subscribeRequest; private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION = new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP"); - private DMaaPConsumerReactiveHttpClient httpClient; - @Autowired - public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException { - this(configuration, new ReRegistrationDmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration, - ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) - throws SSLException { + @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber, + ReRegistrationDmaapConsumerJsonParser parser) { + this.reRegistrationDmaapConsumerJsonParser = parser; this.configuration = configuration; - this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; - - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); + this.subscriber = subscriber; + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @PostConstruct @@ -87,24 +88,25 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC @Override public synchronized void updateConfiguration() { - try { - LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); - LOGGER.info("Creating secure context with:\n {}", - this.configuration.getDmaapReRegistrationConsumerConfiguration()); - httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration()); - } catch (SSLException e) { - LOGGER.error("SSL error while updating HTTP Client after a config update"); - LOGGER.debug("SSL exception\n", e); - } + LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration"); + subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE, + configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(), + configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()); + subscribeRequest = createSubscribeRequest( + subscribeUrl, + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(), + this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()); } @Override public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) { LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName); - DMaaPConsumerReactiveHttpClient httpClient = getHttpClient(); - Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty()); - return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response) - .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION)) + return subscriber.getElements(subscribeRequest) + .flatMap(jsonElement -> + reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement))) + .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION)) .doOnError(e -> { if (!(e instanceof EmptyDmaapResponseException)) { LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage()); @@ -112,8 +114,4 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC } }); } - - private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() { - return httpClient; - } } |