aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2019-05-16 17:44:45 +0200
committerpwielebs <piotr.wielebski@nokia.com>2019-05-22 14:01:54 +0200
commit2cf649dda43c7fc7650b5d0047ccc57108918724 (patch)
tree03d07378786376e077f7d95a6a98c4f66ab85719 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks
parenta4f457e46a336a30ceea69a742e8b8aa8f2e720f (diff)
Align PRH to El Alto SDK
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c Issue-ID: DCAEGEN2-1501 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java31
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java78
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java8
8 files changed, 120 insertions, 101 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
index 02691446..0b26890d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
@@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper;
import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper;
import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer;
import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
-import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-
-
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
@@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask {
@Autowired
BbsActionsTaskImpl(Config config) {
- this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext()));
+ this(config, RxHttpClientFactory.createInsecure());
}
BbsActionsTaskImpl(Config config, RxHttpClient httpClient) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 3a630a40..5fc41d93 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -20,20 +20,15 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
import reactor.core.publisher.Flux;
+import javax.net.ssl.SSLException;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
interface DmaapConsumerTask {
-
Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
-
- DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index d3086cbe..f46e2cc9 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,15 +20,11 @@
package org.onap.dcaegen2.services.prh.tasks;
-import com.google.gson.JsonElement;
-import java.util.Optional;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+
@Autowired
public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+ this(config, new DmaapConsumerJsonParser());
}
- DmaapConsumerTaskImpl(Config prhAppConfig,
- DmaapConsumerJsonParser dmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+ public Flux<ConsumerDmaapModel> execute(String object) {
+ MessageRouterSubscriber messageRouterSubscriberClient =
+ new MessageRouterSubscriberResolver().resolveClient();
LOGGER.debug("Method called with arg {}", object);
- Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(
- Optional.empty());
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
+ .get(config.getMessageRouterSubscribeRequest());
return dmaapConsumerJsonParser.getJsonObject(response);
}
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.getDmaapConsumerConfiguration());
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 7fc596c1..f63f4d76 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -20,10 +20,10 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -31,16 +31,8 @@ import reactor.core.publisher.Mono;
*/
public interface DmaapPublisherTask {
- /**
- *
- * Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
- * */
- @Deprecated
- Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
- execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
- Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
- DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
+ Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 55a8bb58..3a724884 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.tasks;
+import com.google.gson.JsonPrimitive;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import javax.net.ssl.SSLException;
-import java.util.Optional;
import java.util.function.Supplier;
/**
@@ -46,70 +44,52 @@ import java.util.function.Supplier;
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Supplier<DmaapPublisherConfiguration> config;
+
+ private final Supplier<MessageRouterPublishRequest> config;
+ private final MessageRouterPublisherResolver messageRouterPublisherClientResolver;
private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) {
- this(config, new PublisherReactiveHttpClientFactory(
- new DmaaPRestTemplateFactory(),
- new PnfReadyJsonBodyBuilderImpl()));
- }
- DmaapPublisherTaskImpl(
- Supplier<DmaapPublisherConfiguration> config,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) {
this.config = config;
- this.httpClientFactory = httpClientFactory;
+ this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver;
}
@Override
- public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
- execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException {
+ public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
+ MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
- }
-
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.get());
-
+ String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+ return messageRouterPublisher.put(
+ config.get(),
+ Flux.just(json).map(JsonPrimitive::new));
}
/**
*
* Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
* */
@Override
public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
- DefaultHttpClient httpClient = new DefaultHttpClient();
- HttpPost postRequest = new HttpPost(getUrl());
- try {
- StringEntity input = new StringEntity(json);
- input.setContentType(config.get().dmaapContentType());
- postRequest.setEntity(input);
- HttpResponse response = httpClient.execute(postRequest);
- return Mono.just(response);
- } catch (Exception e) {
- LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
- return Mono.error(e);
+ try (DefaultHttpClient httpClient = new DefaultHttpClient()) {
+ HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl());
+ try {
+ StringEntity input = new StringEntity(json);
+ input.setContentType(config.get().contentType());
+ postRequest.setEntity(input);
+ HttpResponse response = httpClient.execute(postRequest);
+ return Mono.just(response);
+ } catch (Exception e) {
+ LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+ return Mono.error(e);
+ }
}
}
- private String getUrl() {
- return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol())
- .host(config.get().dmaapHostName())
- .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build()
- .toString();
- }
- private String createRequestPath() {
- return "/" + config.get().dmaapTopicName();
- }
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
new file mode 100644
index 00000000..2f4e3867
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterPublisherResolver {
+
+ public MessageRouterPublisher resolveClient() {
+ return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
new file mode 100644
index 00000000..63930ef7
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterSubscriberResolver {
+
+ public MessageRouterSubscriber resolveClient() {
+ return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 72ec0cac..4b3436e5 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -50,6 +51,7 @@ public class ScheduledTasks {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapReadyProducerTask;
private final DmaapPublisherTask dmaapUpdateProducerTask;
@@ -208,7 +210,7 @@ public class ScheduledTasks {
* Marked as deprecated due to problems with DMaaP MR, to be fixed in future
*/
@Deprecated
- private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
+ private Flux<MessageRouterPublishResponse>
publishToDmaapConfiguration(final State state) {
try {
if (state.ActivationStatus) {
@@ -217,8 +219,8 @@ public class ScheduledTasks {
}
return dmaapReadyProducerTask.execute(state.DmaapModel);
- } catch (PrhTaskException | SSLException e) {
- return Mono.error(e);
+ } catch (PrhTaskException e) {
+ return Flux.error(e);
}
}