aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java9
1 files changed, 4 insertions, 5 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 564a7a41..3181c069 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
@@ -41,6 +40,7 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
+
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -59,18 +59,17 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
- Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
-
@Override
- public Mono<ConsumerDmaapModel> execute(String object) {
+ public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
dmaapConsumerReactiveHttpClient = resolveClient();
// dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
- return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
+ return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
}
@Override