diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 9d818366..43eb9eaa 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,13 +20,13 @@ package org.onap.dcaegen2.services.prh.tasks; import java.util.Optional; -import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; - +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl; import org.slf4j.Logger; @@ -41,7 +41,7 @@ import org.springframework.stereotype.Component; public class DmaapConsumerTaskImpl extends DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> { - private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient; private DmaapConsumerJsonParser dmaapConsumerJsonParser; @@ -57,12 +57,27 @@ public class DmaapConsumerTaskImpl extends this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; } + @Override - ConsumerDmaapModel consume(String message) throws DmaapNotFoundException { - logger.trace("Method called with arg {}", message); + ConsumerDmaapModel consume(String message) throws PrhTaskException { + logger.info("Consumed model from DmaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message) - .orElseThrow(() -> new DmaapNotFoundException(String.format("Nothing to consume from DmaaP %s topic.", - resolveConfiguration().dmaapTopicName()))); + .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust")); + + } + + @Override + protected void receiveRequest(String body) throws PrhTaskException { + try { + ConsumerDmaapModel response = execute(body); + if (taskProcess != null && response != null) { + taskProcess.receiveRequest(response); + } + } catch (DmaapEmptyResponseException e) { + logger.warn("Nothing to consume from DmaaP {} topic.", + resolveConfiguration().dmaapTopicName()); + } + } @Override @@ -79,7 +94,7 @@ public class DmaapConsumerTaskImpl extends } @Override - DmaapConsumerConfiguration resolveConfiguration() { + protected DmaapConsumerConfiguration resolveConfiguration() { return prhAppConfig.getDmaapConsumerConfiguration(); } |