diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit')
3 files changed, 94 insertions, 74 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java index 30e6cff1..6b289f1c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java @@ -22,10 +22,8 @@ package org.onap.dcaegen2.services.prh.tasks.commit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.configurationprocessor.json.JSONException; import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.KafkaListener; @@ -33,50 +31,72 @@ import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; - import java.util.ArrayList; import java.util.List; /** - * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on 3/13/23 + * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on + * 3/13/23 */ @Profile("autoCommitDisabled") @Component public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener<String, String> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class); - - @Autowired + + private DmaapConsumerJsonParser dmaapConsumerJsonParser; - @Autowired private EpochDateTimeConversion epochDateTimeConversion; + private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode; + private List<String> jsonEvent = new ArrayList<>(); + public List<String> getJsonEvent() { + return jsonEvent; + } + private Acknowledgment offset; - String kafkaTopic = System.getenv("kafkaTopic"); + public Acknowledgment getOffset() { + return offset; + } + + static String commonInURL = "/events/"; + + String kafkaTopic; - String groupIdConfig = System.getenv("groupIdConfig"); + String groupIdConfig; + + + public KafkaConsumerTaskImpl(CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode + ,DmaapConsumerJsonParser dmaapConsumerJsonParser,EpochDateTimeConversion epochDateTimeConversion) { + this.cbsConfigurationForAutoCommitDisabledMode = cbsConfigurationForAutoCommitDisabledMode; + this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; + this.epochDateTimeConversion = epochDateTimeConversion; + String kafkaTopicURL = this.cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest() + .sourceDefinition().topicUrl(); + kafkaTopic = getTopicFromTopicUrl(kafkaTopicURL); + groupIdConfig = cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest().consumerGroup(); + + System.setProperty("kafkaTopic", kafkaTopic); + System.setProperty("groupIdConfig", groupIdConfig); + + } @Override @KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}") public void onMessage(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) { - - if (list != null && !list.isEmpty()) { - - - list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) - .map(ConsumerRecord::value) - .forEach(value -> { - jsonEvent.add(value); + + if (list != null && !list.isEmpty()) { + list.stream().filter( + consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() + && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) + .map(ConsumerRecord::value).forEach(value -> { + jsonEvent.add(value); }); - } - offset = acknowledgment; } @@ -86,14 +106,25 @@ public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledg return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent); } + public void setJsonEvent(List<String> jsonEvent) { + this.jsonEvent = jsonEvent; + } + @Override public void commitOffset() { - if(!jsonEvent.isEmpty()){ + if (!jsonEvent.isEmpty()) { jsonEvent.clear(); } - if(offset != null){ + if (offset != null) { offset.acknowledge(); } } + public String getTopicFromTopicUrl(String topicUrl) { + if (topicUrl.endsWith("/")) { + return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length(), topicUrl.lastIndexOf("/")); + } + return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length()); + } + } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java index 64d7798e..91cdd122 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; @@ -52,7 +51,6 @@ public class ScheduledTasksRunnerWithCommit { private final TaskScheduler taskScheduler; private final PrhProperties prhProperties; - @Autowired private ScheduledTasksWithCommit scheduledTasksWithCommit; public ScheduledTasksRunnerWithCommit(TaskScheduler taskScheduler, ScheduledTasksWithCommit scheduledTasksWithCommit, @@ -64,7 +62,8 @@ public class ScheduledTasksRunnerWithCommit { @EventListener public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { - tryToStartTaskWithCommit(); + LOGGER.info(ENTRY,"### in onApplicationStartedEvent"); + LOGGER.info(ENTRY,"###tryToStartTaskWithCommit="+tryToStartTaskWithCommit()); } /** @@ -72,6 +71,7 @@ public class ScheduledTasksRunnerWithCommit { */ @PreDestroy public synchronized void cancelTasks() { + LOGGER.info(ENTRY,"###In cancelTasks"); scheduledPrhTaskFutureList.forEach(x -> x.cancel(false)); scheduledPrhTaskFutureList.clear(); } @@ -96,4 +96,3 @@ public class ScheduledTasksRunnerWithCommit { } } - diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java index b0eae949..352c0bbc 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java @@ -33,8 +33,6 @@ import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -48,7 +46,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23 + * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> + * on 3/13/23 */ @Profile("autoCommitDisabled") @Component @@ -59,7 +58,7 @@ public class ScheduledTasksWithCommit { private KafkaConsumerTask kafkaConsumerTask; private DmaapPublisherTask dmaapReadyProducerTask; private DmaapPublisherTask dmaapUpdateProducerTask; - private AaiQueryTask aaiQueryTask; + public AaiQueryTask aaiQueryTask; private AaiProducerTask aaiProducerTask; private BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; @@ -73,17 +72,16 @@ public class ScheduledTasksWithCommit { * @param aaiPublisherTask - second task */ @Autowired - public ScheduledTasksWithCommit( - final KafkaConsumerTask kafkaConsumerTask, - @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, - @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, - final AaiQueryTask aaiQueryTask, - final AaiProducerTask aaiPublisherTask, - final BbsActionsTask bbsActionsTask, - final Map<String, String> mdcContextMap) { + public ScheduledTasksWithCommit(final KafkaConsumerTask kafkaConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, final Map<String, String> mdcContextMap) + + { this.dmaapReadyProducerTask = dmaapReadyPublisherTask; this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; - this.kafkaConsumerTask=kafkaConsumerTask; + this.kafkaConsumerTask = kafkaConsumerTask; this.aaiQueryTask = aaiQueryTask; this.aaiProducerTask = aaiPublisherTask; this.bbsActionsTask = bbsActionsTask; @@ -92,7 +90,7 @@ public class ScheduledTasksWithCommit { static class State { public ConsumerDmaapModel dmaapModel; - public Boolean activationStatus; + public Boolean activationStatus; public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { this.dmaapModel = dmaapModel; @@ -103,50 +101,47 @@ public class ScheduledTasksWithCommit { public void scheduleKafkaPrhEventTask() { MdcVariables.setMdcContextMap(mdcContextMap); try { + LOGGER.info("Execution of tasks was registered with commit"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromKafkaMessage() - .flatMap(model->queryAaiForPnf(model) - .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e); - disableCommit(); - }) - .onErrorResume(e -> Mono.empty()) - - ) - .flatMap(this::queryAaiForConfiguration) - .flatMap(this::publishToAaiConfiguration) - .flatMap(this::processAdditionalFields) - .flatMap(this::publishToDmaapConfiguration) + .flatMap(model -> queryAaiForPnf(model).doOnError(e -> { + LOGGER.info("PNF Not Found in AAI --> {}" + e); + LOGGER.info("PNF Not Found in AAI With description of exception --> {}" + e.getMessage()); + disableCommit(); + }).onErrorResume(e -> Mono.empty()) + + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::processAdditionalFields).flatMap(this::publishToDmaapConfiguration) + .onErrorResume(e -> Mono.empty()) - - .doOnTerminate(mainCountDownLatch::countDown) - .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); + + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); mainCountDownLatch.await(); - } catch (InterruptedException | JSONException e ) { + } catch (InterruptedException | JSONException e) { LOGGER.warn("Interruption problem on countDownLatch {}", e); Thread.currentThread().interrupt(); } } - private static void disableCommit() - { - pnfFound=false; + private static void disableCommit() { + pnfFound = false; } private void onCompleteKafka() { LOGGER.info("PRH tasks have been completed"); - if(pnfFound){ + if (pnfFound) { kafkaConsumerTask.commitOffset(); LOGGER.info("Committed the Offset"); - } - else - { + } else { LOGGER.info("Offset not Committed"); - pnfFound=true; + pnfFound = true; } } - private void onSuccess(MessageRouterPublishResponse response) { if (response.successful()) { String statusCodeOk = HttpStatus.OK.name(); @@ -167,23 +162,18 @@ public class ScheduledTasksWithCommit { } private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { - return aaiQueryTask - .execute(monoDMaaPModel) - .map(x -> new State(monoDMaaPModel, x)); + return aaiQueryTask.execute(monoDMaaPModel).map(x -> new State(monoDMaaPModel, x)); } private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) { - LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId()); + LOGGER.info("Find PNF --> " + monoDMaaPModel.getCorrelationId()); return aaiQueryTask.findPnfinAAI(monoDMaaPModel); } - private Mono<State> publishToAaiConfiguration(final State state) { try { - return aaiProducerTask - .execute(state.dmaapModel) - .map(x -> state); + return aaiProducerTask.execute(state.dmaapModel).map(x -> state); } catch (PrhTaskException e) { LOGGER.warn("AAIProducerTask exception has been registered: {}", e); return Mono.error(e); |