summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java85
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java6
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java72
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java78
4 files changed, 114 insertions, 127 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index da510281..153cb91b 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,59 +20,61 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapCpeAuthenticationConsumerTaskImpl
- implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver {
+public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask,
+ ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
- new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
-
- private DMaaPConsumerReactiveHttpClient httpClient;
+ new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
@Autowired
- public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration,
- CpeAuthenticationDmaapConsumerJsonParser
- cpeAuthenticationDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException {
+ @Qualifier("CpeAuthMessageRouterSubscriber")
+ MessageRouterSubscriber subscriber,
+ CpeAuthenticationDmaapConsumerJsonParser parser) {
+ this.cpeAuthenticationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
-
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +89,25 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@Override
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest)
+ .flatMap(jsonElement ->
+ cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +115,4 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index 749c4e53..dec1dbcd 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
public interface DmaapPublisherTask {
- Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 283e5ef9..6c50b10d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,48 @@
package org.onap.bbs.event.processor.tasks;
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private ApplicationConfiguration configuration;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- private DMaaPPublisherReactiveHttpClient httpClient;
+ private ApplicationConfiguration configuration;
+ private MessageRouterPublisher publisher;
+ private String publishUrl;
+ private MessageRouterPublishRequest publishRequest;
@Autowired
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
- this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
- new ControlLoopJsonBodyBuilder()));
- }
-
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) {
this.configuration = configuration;
- this.httpClientFactory = httpClientFactory;
-
- try {
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ this.publisher = publisher;
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ this.configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapProducerProperties().getDmaapHostName(),
+ this.configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@PostConstruct
@@ -83,27 +77,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- try {
- LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ publisher =
+ DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ configuration.getDmaapProducerProperties().getDmaapHostName(),
+ configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@Override
- public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
- if (controlLoopPublisherDmaapModel == null) {
+ public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) {
+ if (event == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message");
- LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- }
-
- private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
- return httpClient;
+ LOGGER.debug("CL message \n{}", event);
+ return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event)));
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index e40037b1..aff563c5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,25 +20,23 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@@ -49,30 +47,33 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP");
- private DMaaPConsumerReactiveHttpClient httpClient;
-
@Autowired
- public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new ReRegistrationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration,
- ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory)
- throws SSLException {
+ @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber,
+ ReRegistrationDmaapConsumerJsonParser parser) {
+ this.reRegistrationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
-
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +88,25 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapReRegistrationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@Override
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest)
+ .flatMap(jsonElement ->
+ reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +114,4 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}