aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java31
1 files changed, 23 insertions, 8 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 9d818366..43eb9eaa 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,13 +20,13 @@
package org.onap.dcaegen2.services.prh.tasks;
import java.util.Optional;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
import org.slf4j.Logger;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
public class DmaapConsumerTaskImpl extends
DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> {
- private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -57,12 +57,27 @@ public class DmaapConsumerTaskImpl extends
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
+
@Override
- ConsumerDmaapModel consume(String message) throws DmaapNotFoundException {
- logger.trace("Method called with arg {}", message);
+ ConsumerDmaapModel consume(String message) throws PrhTaskException {
+ logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException(String.format("Nothing to consume from DmaaP %s topic.",
- resolveConfiguration().dmaapTopicName())));
+ .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust"));
+
+ }
+
+ @Override
+ protected void receiveRequest(String body) throws PrhTaskException {
+ try {
+ ConsumerDmaapModel response = execute(body);
+ if (taskProcess != null && response != null) {
+ taskProcess.receiveRequest(response);
+ }
+ } catch (DmaapEmptyResponseException e) {
+ logger.warn("Nothing to consume from DmaaP {} topic.",
+ resolveConfiguration().dmaapTopicName());
+ }
+
}
@Override
@@ -79,7 +94,7 @@ public class DmaapConsumerTaskImpl extends
}
@Override
- DmaapConsumerConfiguration resolveConfiguration() {
+ protected DmaapConsumerConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapConsumerConfiguration();
}