aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java79
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java82
3 files changed, 94 insertions, 74 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java
index 30e6cff1..6b289f1c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java
@@ -22,10 +22,8 @@ package org.onap.dcaegen2.services.prh.tasks.commit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
@@ -33,50 +31,72 @@ import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
-
import java.util.ArrayList;
import java.util.List;
/**
- * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on 3/13/23
+ * @author <a href="mailto:ajinkya-patil@t-systems.com">Ajinkya Patil</a> on
+ * 3/13/23
*/
@Profile("autoCommitDisabled")
@Component
public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener<String, String> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class);
-
- @Autowired
+
+
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- @Autowired
private EpochDateTimeConversion epochDateTimeConversion;
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode;
+
private List<String> jsonEvent = new ArrayList<>();
+ public List<String> getJsonEvent() {
+ return jsonEvent;
+ }
+
private Acknowledgment offset;
- String kafkaTopic = System.getenv("kafkaTopic");
+ public Acknowledgment getOffset() {
+ return offset;
+ }
+
+ static String commonInURL = "/events/";
+
+ String kafkaTopic;
- String groupIdConfig = System.getenv("groupIdConfig");
+ String groupIdConfig;
+
+
+ public KafkaConsumerTaskImpl(CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode
+ ,DmaapConsumerJsonParser dmaapConsumerJsonParser,EpochDateTimeConversion epochDateTimeConversion) {
+ this.cbsConfigurationForAutoCommitDisabledMode = cbsConfigurationForAutoCommitDisabledMode;
+ this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
+ this.epochDateTimeConversion = epochDateTimeConversion;
+ String kafkaTopicURL = this.cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest()
+ .sourceDefinition().topicUrl();
+ kafkaTopic = getTopicFromTopicUrl(kafkaTopicURL);
+ groupIdConfig = cbsConfigurationForAutoCommitDisabledMode.getMessageRouterSubscribeRequest().consumerGroup();
+
+ System.setProperty("kafkaTopic", kafkaTopic);
+ System.setProperty("groupIdConfig", groupIdConfig);
+
+ }
@Override
@KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}")
public void onMessage(List<ConsumerRecord<String, String>> list, Acknowledgment acknowledgment) {
-
- if (list != null && !list.isEmpty()) {
-
-
- list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay())
- .map(ConsumerRecord::value)
- .forEach(value -> {
- jsonEvent.add(value);
+
+ if (list != null && !list.isEmpty()) {
+ list.stream().filter(
+ consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay()
+ && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay())
+ .map(ConsumerRecord::value).forEach(value -> {
+ jsonEvent.add(value);
});
-
}
-
offset = acknowledgment;
}
@@ -86,14 +106,25 @@ public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledg
return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent);
}
+ public void setJsonEvent(List<String> jsonEvent) {
+ this.jsonEvent = jsonEvent;
+ }
+
@Override
public void commitOffset() {
- if(!jsonEvent.isEmpty()){
+ if (!jsonEvent.isEmpty()) {
jsonEvent.clear();
}
- if(offset != null){
+ if (offset != null) {
offset.acknowledge();
}
}
+ public String getTopicFromTopicUrl(String topicUrl) {
+ if (topicUrl.endsWith("/")) {
+ return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length(), topicUrl.lastIndexOf("/"));
+ }
+ return topicUrl.substring(topicUrl.indexOf(commonInURL) + commonInURL.length());
+ }
+
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java
index 64d7798e..91cdd122 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@@ -52,7 +51,6 @@ public class ScheduledTasksRunnerWithCommit {
private final TaskScheduler taskScheduler;
private final PrhProperties prhProperties;
- @Autowired
private ScheduledTasksWithCommit scheduledTasksWithCommit;
public ScheduledTasksRunnerWithCommit(TaskScheduler taskScheduler, ScheduledTasksWithCommit scheduledTasksWithCommit,
@@ -64,7 +62,8 @@ public class ScheduledTasksRunnerWithCommit {
@EventListener
public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) {
- tryToStartTaskWithCommit();
+ LOGGER.info(ENTRY,"### in onApplicationStartedEvent");
+ LOGGER.info(ENTRY,"###tryToStartTaskWithCommit="+tryToStartTaskWithCommit());
}
/**
@@ -72,6 +71,7 @@ public class ScheduledTasksRunnerWithCommit {
*/
@PreDestroy
public synchronized void cancelTasks() {
+ LOGGER.info(ENTRY,"###In cancelTasks");
scheduledPrhTaskFutureList.forEach(x -> x.cancel(false));
scheduledPrhTaskFutureList.clear();
}
@@ -96,4 +96,3 @@ public class ScheduledTasksRunnerWithCommit {
}
}
-
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
index b0eae949..352c0bbc 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java
@@ -33,8 +33,6 @@ import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -48,7 +46,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23
+ * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a>
+ * on 3/13/23
*/
@Profile("autoCommitDisabled")
@Component
@@ -59,7 +58,7 @@ public class ScheduledTasksWithCommit {
private KafkaConsumerTask kafkaConsumerTask;
private DmaapPublisherTask dmaapReadyProducerTask;
private DmaapPublisherTask dmaapUpdateProducerTask;
- private AaiQueryTask aaiQueryTask;
+ public AaiQueryTask aaiQueryTask;
private AaiProducerTask aaiProducerTask;
private BbsActionsTask bbsActionsTask;
private Map<String, String> mdcContextMap;
@@ -73,17 +72,16 @@ public class ScheduledTasksWithCommit {
* @param aaiPublisherTask - second task
*/
@Autowired
- public ScheduledTasksWithCommit(
- final KafkaConsumerTask kafkaConsumerTask,
- @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
- @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
- final AaiQueryTask aaiQueryTask,
- final AaiProducerTask aaiPublisherTask,
- final BbsActionsTask bbsActionsTask,
- final Map<String, String> mdcContextMap) {
+ public ScheduledTasksWithCommit(final KafkaConsumerTask kafkaConsumerTask,
+ @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
+ @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
+ final AaiQueryTask aaiQueryTask, final AaiProducerTask aaiPublisherTask,
+ final BbsActionsTask bbsActionsTask, final Map<String, String> mdcContextMap)
+
+ {
this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
- this.kafkaConsumerTask=kafkaConsumerTask;
+ this.kafkaConsumerTask = kafkaConsumerTask;
this.aaiQueryTask = aaiQueryTask;
this.aaiProducerTask = aaiPublisherTask;
this.bbsActionsTask = bbsActionsTask;
@@ -92,7 +90,7 @@ public class ScheduledTasksWithCommit {
static class State {
public ConsumerDmaapModel dmaapModel;
- public Boolean activationStatus;
+ public Boolean activationStatus;
public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
this.dmaapModel = dmaapModel;
@@ -103,50 +101,47 @@ public class ScheduledTasksWithCommit {
public void scheduleKafkaPrhEventTask() {
MdcVariables.setMdcContextMap(mdcContextMap);
try {
+
LOGGER.info("Execution of tasks was registered with commit");
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
consumeFromKafkaMessage()
- .flatMap(model->queryAaiForPnf(model)
- .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e);
- disableCommit();
- })
- .onErrorResume(e -> Mono.empty())
-
- )
- .flatMap(this::queryAaiForConfiguration)
- .flatMap(this::publishToAaiConfiguration)
- .flatMap(this::processAdditionalFields)
- .flatMap(this::publishToDmaapConfiguration)
+ .flatMap(model -> queryAaiForPnf(model).doOnError(e -> {
+ LOGGER.info("PNF Not Found in AAI --> {}" + e);
+ LOGGER.info("PNF Not Found in AAI With description of exception --> {}" + e.getMessage());
+ disableCommit();
+ }).onErrorResume(e -> Mono.empty())
+
+ )
+ .flatMap(this::queryAaiForConfiguration)
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::processAdditionalFields).flatMap(this::publishToDmaapConfiguration)
+
.onErrorResume(e -> Mono.empty())
-
- .doOnTerminate(mainCountDownLatch::countDown)
- .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
+
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
mainCountDownLatch.await();
- } catch (InterruptedException | JSONException e ) {
+ } catch (InterruptedException | JSONException e) {
LOGGER.warn("Interruption problem on countDownLatch {}", e);
Thread.currentThread().interrupt();
}
}
- private static void disableCommit()
- {
- pnfFound=false;
+ private static void disableCommit() {
+ pnfFound = false;
}
private void onCompleteKafka() {
LOGGER.info("PRH tasks have been completed");
- if(pnfFound){
+ if (pnfFound) {
kafkaConsumerTask.commitOffset();
LOGGER.info("Committed the Offset");
- }
- else
- {
+ } else {
LOGGER.info("Offset not Committed");
- pnfFound=true;
+ pnfFound = true;
}
}
-
private void onSuccess(MessageRouterPublishResponse response) {
if (response.successful()) {
String statusCodeOk = HttpStatus.OK.name();
@@ -167,23 +162,18 @@ public class ScheduledTasksWithCommit {
}
private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
- return aaiQueryTask
- .execute(monoDMaaPModel)
- .map(x -> new State(monoDMaaPModel, x));
+ return aaiQueryTask.execute(monoDMaaPModel).map(x -> new State(monoDMaaPModel, x));
}
private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) {
- LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId());
+ LOGGER.info("Find PNF --> " + monoDMaaPModel.getCorrelationId());
return aaiQueryTask.findPnfinAAI(monoDMaaPModel);
}
-
private Mono<State> publishToAaiConfiguration(final State state) {
try {
- return aaiProducerTask
- .execute(state.dmaapModel)
- .map(x -> state);
+ return aaiProducerTask.execute(state.dmaapModel).map(x -> state);
} catch (PrhTaskException e) {
LOGGER.warn("AAIProducerTask exception has been registered: {}", e);
return Mono.error(e);