diff options
author | sushant53 <sushant.jadhav@t-systems.com> | 2024-02-29 11:51:05 +0530 |
---|---|---|
committer | Sushant Jadhav <sushant.jadhav@t-systems.com> | 2024-03-05 09:29:55 +0000 |
commit | 0720a8a4e336d516ee00c515a392bb48a23404fd (patch) | |
tree | 5f81e02faa0d9b57d739a230484a91f4aa2900e7 /bpmn/so-bpmn-infrastructure-common/src/main/java/org | |
parent | 0420dbbef6cf04a662dadcd84c6a4682e3a412fc (diff) |
[SO] Code improvement in bpmn-infra supporting kafka change1.13.0
Code improvement in bpmn-infra supporting kafka change
Issue-ID: SO-4122
Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'bpmn/so-bpmn-infrastructure-common/src/main/java/org')
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java) | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java) | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java) | 2 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java) | 4 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java) | 36 |
6 files changed, 39 insertions, 39 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java index 439591a295..d0a6e3a59f 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java @@ -22,23 +22,23 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class CancelDmaapSubscription implements JavaDelegate { +public class CancelKafkaSubscription implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - dmaapClient.unregister(pnfCorrelationId); + kafkaClient.unregister(pnfCorrelationId); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java index 5cbd530a93..6506450c52 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java @@ -23,25 +23,25 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class InformDmaapClient implements JavaDelegate { +public class InformKafkaClient implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java index 9e1b5d5a0b..ef5da92e78 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java @@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; import org.onap.so.bpmn.common.BuildingBlockExecution; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf; import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey; import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB; @@ -45,19 +45,19 @@ import org.springframework.stereotype.Component; public class RegisterForPnfReadyEvent implements JavaDelegate { private static final String ERROR_MESSAGE_PNF_NOT_FOUND = - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"; + "pnf resource not found in buildingBlockExecution while registering to kafka listener"; private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class); - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; private ExtractPojosForBB extractPojosForBB; private ExceptionBuilder exceptionBuilder; private String pnfEntryNotificationTimeout; @Autowired - public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB, + public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB, ExceptionBuilder exceptionBuilder, @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) { - this.dmaapClient = dmaapClient; + this.kafkaClient = kafkaClient; this.extractPojosForBB = extractPojosForBB; this.exceptionBuilder = exceptionBuilder; this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout; @@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate { String pnfName = getPnfName(execution); fillExecution(execution, pnfName); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceId(execution.getProcessInstanceId()).correlateWithResult()); } catch (BBObjectNotFoundException e) { LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java index 9cb566f49b..3c65cbaad3 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java @@ -21,7 +21,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import com.google.gson.JsonArray; import com.google.gson.JsonElement; diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java index fd7eb153b6..941c565bca 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java @@ -19,9 +19,9 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; -public interface DmaapClient { +public interface KafkaClient { void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java index 44b16dad28..0d3e0e0230 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -19,7 +19,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import java.io.IOException; import java.util.Collections; @@ -36,12 +36,12 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); private Map<String, Runnable> pnfCorrelationIdToThreadMap; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; + private volatile boolean kafkaThreadListenerIsRunning; private KafkaConsumerImpl consumerForPnfReady; private KafkaConsumerImpl consumerForPnfUpdate; private String pnfReadyTopic; @@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { + public PnfEventReadyKafkaClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); executor = null; try { consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); @@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); } } @@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient { if (pnfCorrelationIdToThreadMap.isEmpty()) { consumerForPnfUpdate.close(); consumerForPnfReady.close(); - stopDmaapThreadListener(); + stopKafkaThreadListener(); } return runnable; } - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { executor = new ScheduledThreadPoolExecutor(1); executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + kafkaThreadListenerIsRunning = true; } } - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { executor.shutdown(); - dmaapThreadListenerIsRunning = false; + kafkaThreadListenerIsRunning = false; executor = null; } } - class DmaapTopicListenerThread implements Runnable { + class KafkaTopicListenerThread implements Runnable { @Override public void run() { try { @@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { Runnable runnable = unregister(pnfCorrelationId); if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); runnable.run(); } } |