aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java82
1 files changed, 36 insertions, 46 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
index b0eae949..352c0bbc 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
@@ -33,8 +33,6 @@ import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -48,7 +46,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23
+ * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a>
+ * on 3/13/23
*/
@Profile("autoCommitDisabled")
@Component
@@ -59,7 +58,7 @@ public class ScheduledTasksWithCommit {
private KafkaConsumerTask kafkaConsumerTask;
private DmaapPublisherTask dmaapReadyProducerTask;
private DmaapPublisherTask dmaapUpdateProducerTask;
- private AaiQueryTask aaiQueryTask;
+ public AaiQueryTask aaiQueryTask;
private AaiProducerTask aaiProducerTask;
private BbsActionsTask bbsActionsTask;
private Map<String, String> mdcContextMap;
@@ -73,17 +72,16 @@ public class ScheduledTasksWithCommit {
* @param aaiPublisherTask - second task
*/
@Autowired
- public ScheduledTasksWithCommit(
- final KafkaConsumerTask kafkaConsumerTask,
- @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
- @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
- final AaiQueryTask aaiQueryTask,
- final AaiProducerTask aaiPublisherTask,
- final BbsActionsTask bbsActionsTask,
- final Map<String, String> mdcContextMap) {
+ public ScheduledTasksWithCommit(final KafkaConsumerTask kafkaConsumerTask,
+ @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
+ @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
+ final AaiQueryTask aaiQueryTask, final AaiProducerTask aaiPublisherTask,
+ final BbsActionsTask bbsActionsTask, final Map<String, String> mdcContextMap)
+
+ {
this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
- this.kafkaConsumerTask=kafkaConsumerTask;
+ this.kafkaConsumerTask = kafkaConsumerTask;
this.aaiQueryTask = aaiQueryTask;
this.aaiProducerTask = aaiPublisherTask;
this.bbsActionsTask = bbsActionsTask;
@@ -92,7 +90,7 @@ public class ScheduledTasksWithCommit {
static class State {
public ConsumerDmaapModel dmaapModel;
- public Boolean activationStatus;
+ public Boolean activationStatus;
public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
this.dmaapModel = dmaapModel;
@@ -103,50 +101,47 @@ public class ScheduledTasksWithCommit {
public void scheduleKafkaPrhEventTask() {
MdcVariables.setMdcContextMap(mdcContextMap);
try {
+
LOGGER.info("Execution of tasks was registered with commit");
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
consumeFromKafkaMessage()
- .flatMap(model->queryAaiForPnf(model)
- .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e);
- disableCommit();
- })
- .onErrorResume(e -> Mono.empty())
-
- )
- .flatMap(this::queryAaiForConfiguration)
- .flatMap(this::publishToAaiConfiguration)
- .flatMap(this::processAdditionalFields)
- .flatMap(this::publishToDmaapConfiguration)
+ .flatMap(model -> queryAaiForPnf(model).doOnError(e -> {
+ LOGGER.info("PNF Not Found in AAI --> {}" + e);
+ LOGGER.info("PNF Not Found in AAI With description of exception --> {}" + e.getMessage());
+ disableCommit();
+ }).onErrorResume(e -> Mono.empty())
+
+ )
+ .flatMap(this::queryAaiForConfiguration)
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::processAdditionalFields).flatMap(this::publishToDmaapConfiguration)
+
.onErrorResume(e -> Mono.empty())
-
- .doOnTerminate(mainCountDownLatch::countDown)
- .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
+
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
mainCountDownLatch.await();
- } catch (InterruptedException | JSONException e ) {
+ } catch (InterruptedException | JSONException e) {
LOGGER.warn("Interruption problem on countDownLatch {}", e);
Thread.currentThread().interrupt();
}
}
- private static void disableCommit()
- {
- pnfFound=false;
+ private static void disableCommit() {
+ pnfFound = false;
}
private void onCompleteKafka() {
LOGGER.info("PRH tasks have been completed");
- if(pnfFound){
+ if (pnfFound) {
kafkaConsumerTask.commitOffset();
LOGGER.info("Committed the Offset");
- }
- else
- {
+ } else {
LOGGER.info("Offset not Committed");
- pnfFound=true;
+ pnfFound = true;
}
}
-
private void onSuccess(MessageRouterPublishResponse response) {
if (response.successful()) {
String statusCodeOk = HttpStatus.OK.name();
@@ -167,23 +162,18 @@ public class ScheduledTasksWithCommit {
}
private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
- return aaiQueryTask
- .execute(monoDMaaPModel)
- .map(x -> new State(monoDMaaPModel, x));
+ return aaiQueryTask.execute(monoDMaaPModel).map(x -> new State(monoDMaaPModel, x));
}
private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) {
- LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId());
+ LOGGER.info("Find PNF --> " + monoDMaaPModel.getCorrelationId());
return aaiQueryTask.findPnfinAAI(monoDMaaPModel);
}
-
private Mono<State> publishToAaiConfiguration(final State state) {
try {
- return aaiProducerTask
- .execute(state.dmaapModel)
- .map(x -> state);
+ return aaiProducerTask.execute(state.dmaapModel).map(x -> state);
} catch (PrhTaskException e) {
LOGGER.warn("AAIProducerTask exception has been registered: {}", e);
return Mono.error(e);