diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java | 82 |
1 files changed, 36 insertions, 46 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java index b0eae949..352c0bbc 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java @@ -33,8 +33,6 @@ import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -48,7 +46,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23 + * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> + * on 3/13/23 */ @Profile("autoCommitDisabled") @Component @@ -59,7 +58,7 @@ public class ScheduledTasksWithCommit { private KafkaConsumerTask kafkaConsumerTask; private DmaapPublisherTask dmaapReadyProducerTask; private DmaapPublisherTask dmaapUpdateProducerTask; - private AaiQueryTask aaiQueryTask; + public AaiQueryTask aaiQueryTask; private AaiProducerTask aaiProducerTask; private BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; @@ -73,17 +72,16 @@ public class ScheduledTasksWithCommit { * @param aaiPublisherTask - second task */ @Autowired - public ScheduledTasksWithCommit( - final KafkaConsumerTask kafkaConsumerTask, - @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, - @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, - final AaiQueryTask aaiQueryTask, - final AaiProducerTask aaiPublisherTask, - final BbsActionsTask bbsActionsTask, - final Map<String, String> mdcContextMap) { + public ScheduledTasksWithCommit(final KafkaConsumerTask kafkaConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, final Map<String, String> mdcContextMap) + + { this.dmaapReadyProducerTask = dmaapReadyPublisherTask; this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; - this.kafkaConsumerTask=kafkaConsumerTask; + this.kafkaConsumerTask = kafkaConsumerTask; this.aaiQueryTask = aaiQueryTask; this.aaiProducerTask = aaiPublisherTask; this.bbsActionsTask = bbsActionsTask; @@ -92,7 +90,7 @@ public class ScheduledTasksWithCommit { static class State { public ConsumerDmaapModel dmaapModel; - public Boolean activationStatus; + public Boolean activationStatus; public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { this.dmaapModel = dmaapModel; @@ -103,50 +101,47 @@ public class ScheduledTasksWithCommit { public void scheduleKafkaPrhEventTask() { MdcVariables.setMdcContextMap(mdcContextMap); try { + LOGGER.info("Execution of tasks was registered with commit"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromKafkaMessage() - .flatMap(model->queryAaiForPnf(model) - .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e); - disableCommit(); - }) - .onErrorResume(e -> Mono.empty()) - - ) - .flatMap(this::queryAaiForConfiguration) - .flatMap(this::publishToAaiConfiguration) - .flatMap(this::processAdditionalFields) - .flatMap(this::publishToDmaapConfiguration) + .flatMap(model -> queryAaiForPnf(model).doOnError(e -> { + LOGGER.info("PNF Not Found in AAI --> {}" + e); + LOGGER.info("PNF Not Found in AAI With description of exception --> {}" + e.getMessage()); + disableCommit(); + }).onErrorResume(e -> Mono.empty()) + + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::processAdditionalFields).flatMap(this::publishToDmaapConfiguration) + .onErrorResume(e -> Mono.empty()) - - .doOnTerminate(mainCountDownLatch::countDown) - .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); + + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); mainCountDownLatch.await(); - } catch (InterruptedException | JSONException e ) { + } catch (InterruptedException | JSONException e) { LOGGER.warn("Interruption problem on countDownLatch {}", e); Thread.currentThread().interrupt(); } } - private static void disableCommit() - { - pnfFound=false; + private static void disableCommit() { + pnfFound = false; } private void onCompleteKafka() { LOGGER.info("PRH tasks have been completed"); - if(pnfFound){ + if (pnfFound) { kafkaConsumerTask.commitOffset(); LOGGER.info("Committed the Offset"); - } - else - { + } else { LOGGER.info("Offset not Committed"); - pnfFound=true; + pnfFound = true; } } - private void onSuccess(MessageRouterPublishResponse response) { if (response.successful()) { String statusCodeOk = HttpStatus.OK.name(); @@ -167,23 +162,18 @@ public class ScheduledTasksWithCommit { } private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { - return aaiQueryTask - .execute(monoDMaaPModel) - .map(x -> new State(monoDMaaPModel, x)); + return aaiQueryTask.execute(monoDMaaPModel).map(x -> new State(monoDMaaPModel, x)); } private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) { - LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId()); + LOGGER.info("Find PNF --> " + monoDMaaPModel.getCorrelationId()); return aaiQueryTask.findPnfinAAI(monoDMaaPModel); } - private Mono<State> publishToAaiConfiguration(final State state) { try { - return aaiProducerTask - .execute(state.dmaapModel) - .map(x -> state); + return aaiProducerTask.execute(state.dmaapModel).map(x -> state); } catch (PrhTaskException e) { LOGGER.warn("AAIProducerTask exception has been registered: {}", e); return Mono.error(e); |