diff options
author | sushant53 <sushant.jadhav@t-systems.com> | 2024-02-29 11:51:05 +0530 |
---|---|---|
committer | Sushant Jadhav <sushant.jadhav@t-systems.com> | 2024-03-05 09:29:55 +0000 |
commit | 0720a8a4e336d516ee00c515a392bb48a23404fd (patch) | |
tree | 5f81e02faa0d9b57d739a230484a91f4aa2900e7 /bpmn/so-bpmn-infrastructure-common | |
parent | 0420dbbef6cf04a662dadcd84c6a4682e3a412fc (diff) |
[SO] Code improvement in bpmn-infra supporting kafka change1.13.0
Code improvement in bpmn-infra supporting kafka change
Issue-ID: SO-4122
Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'bpmn/so-bpmn-infrastructure-common')
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java) | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java) | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java) | 2 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java) | 4 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java) | 36 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java) | 12 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java) | 28 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java) | 4 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java | 22 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java) | 2 | ||||
-rw-r--r-- | bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java) | 24 |
12 files changed, 85 insertions, 85 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java index 439591a295..d0a6e3a59f 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java @@ -22,23 +22,23 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class CancelDmaapSubscription implements JavaDelegate { +public class CancelKafkaSubscription implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - dmaapClient.unregister(pnfCorrelationId); + kafkaClient.unregister(pnfCorrelationId); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java index 5cbd530a93..6506450c52 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java @@ -23,25 +23,25 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class InformDmaapClient implements JavaDelegate { +public class InformKafkaClient implements JavaDelegate { - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); } @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java index 9e1b5d5a0b..ef5da92e78 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java @@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; import org.onap.so.bpmn.common.BuildingBlockExecution; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf; import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey; import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB; @@ -45,19 +45,19 @@ import org.springframework.stereotype.Component; public class RegisterForPnfReadyEvent implements JavaDelegate { private static final String ERROR_MESSAGE_PNF_NOT_FOUND = - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"; + "pnf resource not found in buildingBlockExecution while registering to kafka listener"; private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class); - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; private ExtractPojosForBB extractPojosForBB; private ExceptionBuilder exceptionBuilder; private String pnfEntryNotificationTimeout; @Autowired - public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB, + public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB, ExceptionBuilder exceptionBuilder, @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) { - this.dmaapClient = dmaapClient; + this.kafkaClient = kafkaClient; this.extractPojosForBB = extractPojosForBB; this.exceptionBuilder = exceptionBuilder; this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout; @@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate { String pnfName = getPnfName(execution); fillExecution(execution, pnfName); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceId(execution.getProcessInstanceId()).correlateWithResult()); } catch (BBObjectNotFoundException e) { LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java index 9cb566f49b..3c65cbaad3 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java @@ -21,7 +21,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import com.google.gson.JsonArray; import com.google.gson.JsonElement; diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java index fd7eb153b6..941c565bca 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java @@ -19,9 +19,9 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; -public interface DmaapClient { +public interface KafkaClient { void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java index 44b16dad28..0d3e0e0230 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -19,7 +19,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import java.io.IOException; import java.util.Collections; @@ -36,12 +36,12 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); private Map<String, Runnable> pnfCorrelationIdToThreadMap; private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; + private volatile boolean kafkaThreadListenerIsRunning; private KafkaConsumerImpl consumerForPnfReady; private KafkaConsumerImpl consumerForPnfUpdate; private String pnfReadyTopic; @@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { + public PnfEventReadyKafkaClient(Environment env) throws IOException { pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); executor = null; try { consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); @@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); } } @@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient { if (pnfCorrelationIdToThreadMap.isEmpty()) { consumerForPnfUpdate.close(); consumerForPnfReady.close(); - stopDmaapThreadListener(); + stopKafkaThreadListener(); } return runnable; } - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { executor = new ScheduledThreadPoolExecutor(1); executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + kafkaThreadListenerIsRunning = true; } } - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { executor.shutdown(); - dmaapThreadListenerIsRunning = false; + kafkaThreadListenerIsRunning = false; executor = null; } } - class DmaapTopicListenerThread implements Runnable { + class KafkaTopicListenerThread implements Runnable { @Override public void run() { try { @@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { Runnable runnable = unregister(pnfCorrelationId); if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); runnable.run(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java index c2e87d57bf..6230dab3fa 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java @@ -27,25 +27,25 @@ import static org.mockito.Mockito.when; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.junit.Test; -public class CancelDmaapSubscriptionTest { +public class CancelKafkaSubscriptionTest { private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId"; @Test public void shouldCancelSubscription() { // given - CancelDmaapSubscription delegate = new CancelDmaapSubscription(); - DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl(); - delegate.setDmaapClient(dmaapClientTest); + CancelKafkaSubscription delegate = new CancelKafkaSubscription(); + KafkaClientTestImpl kafkaClientTest = new KafkaClientTestImpl(); + delegate.setKafkaClient(kafkaClientTest); DelegateExecution delegateExecution = mock(DelegateExecution.class); when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) .thenReturn(TEST_PNF_CORRELATION_ID); when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); - dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> { + kafkaClientTest.registerForUpdate("testPnfCorrelationId", () -> { }); // when delegate.execute(delegateExecution); // then - assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse(); + assertThat(kafkaClientTest.haveRegisteredConsumer()).isFalse(); } } diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java index 94aa1427a4..d8a102f2b4 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java @@ -35,40 +35,40 @@ import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; -public class InformDmaapClientTest { +public class InformKafkaClientTest { @Before public void setUp() { - informDmaapClient = new InformDmaapClient(); - dmaapClientTest = new DmaapClientTestImpl(); - informDmaapClient.setDmaapClient(dmaapClientTest); + informKafkaClient = new InformKafkaClient(); + kafkaClientTest = new KafkaClientTestImpl(); + informKafkaClient.setKafkaClient(kafkaClientTest); delegateExecution = mockDelegateExecution(); } - private InformDmaapClient informDmaapClient; + private InformKafkaClient informKafkaClient; - private DmaapClientTestImpl dmaapClientTest; + private KafkaClientTestImpl kafkaClientTest; private DelegateExecution delegateExecution; private MessageCorrelationBuilder messageCorrelationBuilder; @Test - public void shouldSendListenerToDmaapClient() { + public void shouldSendListenerToKafkaClient() { // when - informDmaapClient.execute(delegateExecution); + informKafkaClient.execute(delegateExecution); // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); - assertThat(dmaapClientTest.getInformConsumer()).isNotNull(); + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + assertThat(kafkaClientTest.getInformConsumer()).isNotNull(); verifyZeroInteractions(messageCorrelationBuilder); } @Test - public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() { + public void shouldSendListenerToKafkaClientAndSendMessageToCamunda() { // when - informDmaapClient.execute(delegateExecution); - dmaapClientTest.getInformConsumer().run(); + informKafkaClient.execute(delegateExecution); + kafkaClientTest.getInformConsumer().run(); // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); InOrder inOrder = inOrder(messageCorrelationBuilder); inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey"); inOrder.verify(messageCorrelationBuilder).correlateWithResult(); diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java index 0ec0ac8214..c374ba40c2 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java @@ -22,9 +22,9 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; import java.util.Objects; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; -public class DmaapClientTestImpl implements DmaapClient { +public class KafkaClientTestImpl implements KafkaClient { private String pnfCorrelationId; private Runnable informConsumer; diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java index 7ec05fda04..6422e5f88e 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java @@ -29,7 +29,7 @@ public class RegisterForPnfReadyEventTest { private DelegateExecution delegateExecution; private ExtractPojosForBB extractPojosForBBMock; - private DmaapClientTestImpl dmaapClientTest; + private KafkaClientTestImpl kafkaClientTest; private MessageCorrelationBuilder messageCorrelationBuilder; private ExceptionBuilder exceptionBuilderMock; private BuildingBlockExecution buildingBlockExecution; @@ -39,7 +39,7 @@ public class RegisterForPnfReadyEventTest { @Before public void init() { delegateExecution = prepareExecution(); - dmaapClientTest = new DmaapClientTestImpl(); + kafkaClientTest = new KafkaClientTestImpl(); exceptionBuilderMock = mock(ExceptionBuilder.class); extractPojosForBBMock = mock(ExtractPojosForBB.class); buildingBlockExecution = new DelegateExecutionImpl(new HashMap<>()); @@ -47,9 +47,9 @@ public class RegisterForPnfReadyEventTest { } @Test - public void shouldRegisterForDmaapClient() throws BBObjectNotFoundException { + public void shouldRegisterForKafkaClient() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); Pnf pnf = new Pnf(); pnf.setPnfName(PNF_NAME); @@ -60,13 +60,13 @@ public class RegisterForPnfReadyEventTest { verify(delegateExecution).setVariable(ExecutionVariableNames.PNF_CORRELATION_ID, PNF_NAME); verify(delegateExecution).setVariable(ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION, PNF_ENTRY_NOTIFICATION_TIMEOUT); - checkIfInformConsumerThreadIsRunProperly(dmaapClientTest); + checkIfInformConsumerThreadIsRunProperly(kafkaClientTest); } @Test public void pnfNotFoundInBBexecution_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)) .thenThrow(BBObjectNotFoundException.class); @@ -74,13 +74,13 @@ public class RegisterForPnfReadyEventTest { testedObject.execute(delegateExecution); // then verify(exceptionBuilderMock).buildAndThrowWorkflowException(delegateExecution, 7000, - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"); + "pnf resource not found in buildingBlockExecution while registering to kafka listener"); } @Test public void pnfNameIsNull_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when @@ -92,7 +92,7 @@ public class RegisterForPnfReadyEventTest { @Test public void pnfEventNotificationTimeoutNotSet_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, null); + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, null); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when testedObject.execute(delegateExecution); @@ -101,8 +101,8 @@ public class RegisterForPnfReadyEventTest { "pnfEntryNotificationTimeout value not defined"); } - private void checkIfInformConsumerThreadIsRunProperly(DmaapClientTestImpl dmaapClientTest) { - dmaapClientTest.getInformConsumer().run(); + private void checkIfInformConsumerThreadIsRunProperly(KafkaClientTestImpl kafkaClientTest) { + kafkaClientTest.getInformConsumer().run(); InOrder inOrder = inOrder(messageCorrelationBuilder); inOrder.verify(messageCorrelationBuilder).processInstanceId(PROCESS_INSTANCE_ID); inOrder.verify(messageCorrelationBuilder).correlateWithResult(); diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java index f9e4cb4c88..71d60305e4 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java index 546e644fbd..41887e3586 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; +package org.onap.so.bpmn.infrastructure.pnf.kafka; import static org.junit.Assert.assertEquals; @@ -42,13 +42,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; +import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread; import org.onap.so.client.kafka.KafkaConsumerImpl; import org.springframework.core.env.Environment; @RunWith(MockitoJUnitRunner.class) -public class PnfEventReadyDmaapClientTest { +public class PnfEventReadyKafkaClientTest { private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; private static final String PNF_CORRELATION_ID = "corrTestId"; private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; @@ -66,9 +66,9 @@ public class PnfEventReadyDmaapClientTest { @Mock private Environment env; - private PnfEventReadyDmaapClient testedObject; + private PnfEventReadyKafkaClient testedObject; - private DmaapTopicListenerThread testedObjectInnerClassThread; + private KafkaTopicListenerThread testedObjectInnerClassThread; private KafkaConsumerImpl kafkaConsumerMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledThreadPoolExecutor executorMock; @@ -81,10 +81,10 @@ public class PnfEventReadyDmaapClientTest { when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); - when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) + when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class))) .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); - testedObject = new PnfEventReadyDmaapClient(env); - testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); + testedObject = new PnfEventReadyKafkaClient(env); + testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread(); kafkaConsumerMock = mock(KafkaConsumerImpl.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledThreadPoolExecutor.class); @@ -94,7 +94,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: * <p> - * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true * <p> * - map is filled with one entry with the key that we get from response * <p> @@ -130,7 +130,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: * <p> - * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true * <p> * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http * response. run method should not do anything with the map not run any thread to notify camunda process @@ -147,7 +147,7 @@ public class PnfEventReadyDmaapClientTest { /** * Test run method, where the are following conditions: * <p> - * - DmaapThreadListener is running, flag is set to true + * - KafkaThreadListener is running, flag is set to true * <p> * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run * method should not do anything with the map and not run any thread to notify camunda process @@ -182,7 +182,7 @@ public class PnfEventReadyDmaapClientTest { pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow); pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap); - Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning"); + Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning"); threadRunFlag.setAccessible(true); threadRunFlag.set(testedObject, true); threadRunFlag.setAccessible(false); |