diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java | 79 |
1 files changed, 55 insertions, 24 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java index 30e6cff1..6b289f1c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java @@ -22,10 +22,8 @@ package org.onap.dcaegen2.services.prh.tasks.commit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.configurationprocessor.json.JSONException; import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; @@ -33,50 +31,72 @@ import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; - import java.util.ArrayList; import java.util.List; /** - * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on 3/13/23 + * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on + * 3/13/23 */ @Profile("autoCommitDisabled") @Component public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener<String, String> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class); - - @Autowired + + private DmaapConsumerJsonParser dmaapConsumerJsonParser; - @Autowired private EpochDateTimeConversion epochDateTimeConversion; + private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode; + private List<String> jsonEvent = new ArrayList<>(); + public List<String> getJsonEvent() { + return jsonEvent; + } + private Acknowledgment offset; - String kafkaTopic = System.getenv("kafkaTopic"); + public Acknowledgment getOffset() { + return offset; + } + + static String commonInURL = "/events/"; + + String kafkaTopic; - String groupIdConfig = System.getenv("groupIdConfig"); + String groupIdConfig; + + + public KafkaConsumerTaskImpl(CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode + ,DmaapConsumerJsonParser dmaapConsumerJsonParser,EpochDateTimeConversion epochDateTimeConversion) { + this.cbsConfigurationForAutoCommitDisabledMode = cbsConfigurationForAutoCommitDisabledMode; + this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; + this.epochDateTimeConversion = epochDateTimeConversion; + String kafkaTopicURL = this.cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest() + .sourceDefinition().topicUrl(); + kafkaTopic = getTopicFromTopicUrl(kafkaTopicURL); + groupIdConfig = cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest().consumerGroup(); + + System.setProperty("kafkaTopic", kafkaTopic); + System.setProperty("groupIdConfig", groupIdConfig); + + } @Override @KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}") public void onMessage(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) { - - if (list != null && !list.isEmpty()) { - - - list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) - .map(ConsumerRecord::value) - .forEach(value -> { - jsonEvent.add(value); + + if (list != null && !list.isEmpty()) { + list.stream().filter( + consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() + && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) + .map(ConsumerRecord::value).forEach(value -> { + jsonEvent.add(value); }); - } - offset = acknowledgment; } @@ -86,14 +106,25 @@ public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledg return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent); } + public void setJsonEvent(List<String> jsonEvent) { + this.jsonEvent = jsonEvent; + } + @Override public void commitOffset() { - if(!jsonEvent.isEmpty()){ + if (!jsonEvent.isEmpty()) { jsonEvent.clear(); } - if(offset != null){ + if (offset != null) { offset.acknowledge(); } } + public String getTopicFromTopicUrl(String topicUrl) { + if (topicUrl.endsWith("/")) { + return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length(), topicUrl.lastIndexOf("/")); + } + return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length()); + } + } |