aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsushant53 <sushant.jadhav@t-systems.com>2024-02-29 11:51:05 +0530
committerSushant Jadhav <sushant.jadhav@t-systems.com>2024-03-05 09:29:55 +0000
commit0720a8a4e336d516ee00c515a392bb48a23404fd (patch)
tree5f81e02faa0d9b57d739a230484a91f4aa2900e7
parent0420dbbef6cf04a662dadcd84c6a4682e3a412fc (diff)
[SO] Code improvement in bpmn-infra supporting kafka change1.13.0
Code improvement in bpmn-infra supporting kafka change Issue-ID: SO-4122 Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2 Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
-rw-r--r--bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java20
-rw-r--r--bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java (renamed from bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java)5
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java)12
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java)12
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java12
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java)2
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java)4
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java (renamed from bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java)36
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java)12
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java)28
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java)4
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java22
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java)2
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java (renamed from bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java)24
-rw-r--r--bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java1
15 files changed, 98 insertions, 98 deletions
diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java
index 0c001b1192..32ec69e5fa 100644
--- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java
+++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java
@@ -49,7 +49,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
private PnfManagementTestImpl pnfManagementTest;
@Autowired
- private DmaapClientTestImpl dmaapClientTestImpl;
+ private KafkaClientTestImpl kafkaClientTestImpl;
@Before
public void setup() {
@@ -60,7 +60,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
}
@Test
- public void shouldWaitForMessageFromDmaapAndUpdateAaiEntryWhenAaiEntryExists() {
+ public void shouldWaitForMessageFromKafkaAndUpdateAaiEntryWhenAaiEntryExists() {
// given
variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITH_ENTRY);
ResourceInput ri = getUpdateResInputObj("OLT");
@@ -72,19 +72,19 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
// when
ProcessInstance instance =
runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables);
- assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage");
- dmaapClientTestImpl.sendMessage();
+ assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage");
+ kafkaClientTestImpl.sendMessage();
// then
assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs",
- "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformDmaapClient",
- "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
+ "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformKafkaClient",
+ "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap())
.containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITH_ENTRY));
}
@Test
- public void shouldCreateAaiEntryWaitForMessageFromDmaapAndUpdateAaiEntryWhenNoAaiEntryExists() {
+ public void shouldCreateAaiEntryWaitForMessageFromKafkaAndUpdateAaiEntryWhenNoAaiEntryExists() {
// given
variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY);
ResourceInput ri = getUpdateResInputObj("OLT");
@@ -96,13 +96,13 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest {
// when
ProcessInstance instance =
runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables);
- assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage");
- dmaapClientTestImpl.sendMessage();
+ assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage");
+ kafkaClientTestImpl.sendMessage();
// then
assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs",
"CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "CreatePnfEntryInAai", "AaiEntryExists",
- "InformDmaapClient", "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
+ "InformKafkaClient", "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated");
Assertions.assertThat(pnfManagementTest.getCreated()).containsOnlyKeys(PnfManagementTestImpl.ID_WITHOUT_ENTRY);
Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap())
.containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY));
diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java
index 43fbc59b3d..e4c1717752 100644
--- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
+++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java
@@ -20,15 +20,14 @@
package org.onap.so.bpmn.infrastructure.pnf.delegate;
-import java.util.Map;
import java.util.Objects;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
-public class DmaapClientTestImpl implements DmaapClient {
+public class KafkaClientTestImpl implements KafkaClient {
private String pnfCorrelationId;
private Runnable informConsumer;
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java
index 439591a295..d0a6e3a59f 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java
@@ -22,23 +22,23 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class CancelDmaapSubscription implements JavaDelegate {
+public class CancelKafkaSubscription implements JavaDelegate {
- private DmaapClient dmaapClient;
+ private KafkaClient kafkaClient;
@Override
public void execute(DelegateExecution execution) {
String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
- dmaapClient.unregister(pnfCorrelationId);
+ kafkaClient.unregister(pnfCorrelationId);
}
@Autowired
- public void setDmaapClient(DmaapClient dmaapClient) {
- this.dmaapClient = dmaapClient;
+ public void setKafkaClient(KafkaClient kafkaClient) {
+ this.kafkaClient = kafkaClient;
}
}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java
index 5cbd530a93..6506450c52 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java
@@ -23,25 +23,25 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class InformDmaapClient implements JavaDelegate {
+public class InformKafkaClient implements JavaDelegate {
- private DmaapClient dmaapClient;
+ private KafkaClient kafkaClient;
@Override
public void execute(DelegateExecution execution) {
String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
- dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+ kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
.processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
}
@Autowired
- public void setDmaapClient(DmaapClient dmaapClient) {
- this.dmaapClient = dmaapClient;
+ public void setKafkaClient(KafkaClient kafkaClient) {
+ this.kafkaClient = kafkaClient;
}
}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java
index 9e1b5d5a0b..ef5da92e78 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java
@@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.onap.so.bpmn.common.BuildingBlockExecution;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf;
import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey;
import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB;
@@ -45,19 +45,19 @@ import org.springframework.stereotype.Component;
public class RegisterForPnfReadyEvent implements JavaDelegate {
private static final String ERROR_MESSAGE_PNF_NOT_FOUND =
- "pnf resource not found in buildingBlockExecution while registering to dmaap listener";
+ "pnf resource not found in buildingBlockExecution while registering to kafka listener";
private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class);
- private DmaapClient dmaapClient;
+ private KafkaClient kafkaClient;
private ExtractPojosForBB extractPojosForBB;
private ExceptionBuilder exceptionBuilder;
private String pnfEntryNotificationTimeout;
@Autowired
- public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB,
+ public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB,
ExceptionBuilder exceptionBuilder,
@Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) {
- this.dmaapClient = dmaapClient;
+ this.kafkaClient = kafkaClient;
this.extractPojosForBB = extractPojosForBB;
this.exceptionBuilder = exceptionBuilder;
this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout;
@@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate {
String pnfName = getPnfName(execution);
fillExecution(execution, pnfName);
RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
- dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+ kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
.processInstanceId(execution.getProcessInstanceId()).correlateWithResult());
} catch (BBObjectNotFoundException e) {
LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND);
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java
index 9cb566f49b..3c65cbaad3 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java
@@ -21,7 +21,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java
index fd7eb153b6..941c565bca 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java
@@ -19,9 +19,9 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
-public interface DmaapClient {
+public interface KafkaClient {
void registerForUpdate(String pnfCorrelationId, Runnable informConsumer);
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java
index 44b16dad28..0d3e0e0230 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java
@@ -19,7 +19,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
import java.io.IOException;
import java.util.Collections;
@@ -36,12 +36,12 @@ import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
-public class PnfEventReadyDmaapClient implements DmaapClient {
- private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class);
+public class PnfEventReadyKafkaClient implements KafkaClient {
+ private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class);
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
- private volatile boolean dmaapThreadListenerIsRunning;
+ private volatile boolean kafkaThreadListenerIsRunning;
private KafkaConsumerImpl consumerForPnfReady;
private KafkaConsumerImpl consumerForPnfUpdate;
private String pnfReadyTopic;
@@ -53,9 +53,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
@Autowired
- public PnfEventReadyDmaapClient(Environment env) throws IOException {
+ public PnfEventReadyKafkaClient(Environment env) throws IOException {
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
- topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
+ topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class);
executor = null;
try {
consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
@@ -75,8 +75,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId);
pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
- if (!dmaapThreadListenerIsRunning) {
- startDmaapThreadListener();
+ if (!kafkaThreadListenerIsRunning) {
+ startKafkaThreadListener();
}
}
@@ -87,31 +87,31 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
if (pnfCorrelationIdToThreadMap.isEmpty()) {
consumerForPnfUpdate.close();
consumerForPnfReady.close();
- stopDmaapThreadListener();
+ stopKafkaThreadListener();
}
return runnable;
}
- private synchronized void startDmaapThreadListener() {
- if (!dmaapThreadListenerIsRunning) {
+ private synchronized void startKafkaThreadListener() {
+ if (!kafkaThreadListenerIsRunning) {
executor = new ScheduledThreadPoolExecutor(1);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds,
+ executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds,
TimeUnit.SECONDS);
- dmaapThreadListenerIsRunning = true;
+ kafkaThreadListenerIsRunning = true;
}
}
- private synchronized void stopDmaapThreadListener() {
- if (dmaapThreadListenerIsRunning) {
+ private synchronized void stopKafkaThreadListener() {
+ if (kafkaThreadListenerIsRunning) {
executor.shutdown();
- dmaapThreadListenerIsRunning = false;
+ kafkaThreadListenerIsRunning = false;
executor = null;
}
}
- class DmaapTopicListenerThread implements Runnable {
+ class KafkaTopicListenerThread implements Runnable {
@Override
public void run() {
try {
@@ -141,7 +141,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {
- logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
+ logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
runnable.run();
}
}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java
index c2e87d57bf..6230dab3fa 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java
@@ -27,25 +27,25 @@ import static org.mockito.Mockito.when;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.junit.Test;
-public class CancelDmaapSubscriptionTest {
+public class CancelKafkaSubscriptionTest {
private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId";
@Test
public void shouldCancelSubscription() {
// given
- CancelDmaapSubscription delegate = new CancelDmaapSubscription();
- DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl();
- delegate.setDmaapClient(dmaapClientTest);
+ CancelKafkaSubscription delegate = new CancelKafkaSubscription();
+ KafkaClientTestImpl kafkaClientTest = new KafkaClientTestImpl();
+ delegate.setKafkaClient(kafkaClientTest);
DelegateExecution delegateExecution = mock(DelegateExecution.class);
when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
.thenReturn(TEST_PNF_CORRELATION_ID);
when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
- dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> {
+ kafkaClientTest.registerForUpdate("testPnfCorrelationId", () -> {
});
// when
delegate.execute(delegateExecution);
// then
- assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse();
+ assertThat(kafkaClientTest.haveRegisteredConsumer()).isFalse();
}
}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java
index 94aa1427a4..d8a102f2b4 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java
@@ -35,40 +35,40 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
-public class InformDmaapClientTest {
+public class InformKafkaClientTest {
@Before
public void setUp() {
- informDmaapClient = new InformDmaapClient();
- dmaapClientTest = new DmaapClientTestImpl();
- informDmaapClient.setDmaapClient(dmaapClientTest);
+ informKafkaClient = new InformKafkaClient();
+ kafkaClientTest = new KafkaClientTestImpl();
+ informKafkaClient.setKafkaClient(kafkaClientTest);
delegateExecution = mockDelegateExecution();
}
- private InformDmaapClient informDmaapClient;
+ private InformKafkaClient informKafkaClient;
- private DmaapClientTestImpl dmaapClientTest;
+ private KafkaClientTestImpl kafkaClientTest;
private DelegateExecution delegateExecution;
private MessageCorrelationBuilder messageCorrelationBuilder;
@Test
- public void shouldSendListenerToDmaapClient() {
+ public void shouldSendListenerToKafkaClient() {
// when
- informDmaapClient.execute(delegateExecution);
+ informKafkaClient.execute(delegateExecution);
// then
- assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
- assertThat(dmaapClientTest.getInformConsumer()).isNotNull();
+ assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
+ assertThat(kafkaClientTest.getInformConsumer()).isNotNull();
verifyZeroInteractions(messageCorrelationBuilder);
}
@Test
- public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() {
+ public void shouldSendListenerToKafkaClientAndSendMessageToCamunda() {
// when
- informDmaapClient.execute(delegateExecution);
- dmaapClientTest.getInformConsumer().run();
+ informKafkaClient.execute(delegateExecution);
+ kafkaClientTest.getInformConsumer().run();
// then
- assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
+ assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId");
InOrder inOrder = inOrder(messageCorrelationBuilder);
inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey");
inOrder.verify(messageCorrelationBuilder).correlateWithResult();
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java
index 0ec0ac8214..c374ba40c2 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java
@@ -22,9 +22,9 @@
package org.onap.so.bpmn.infrastructure.pnf.delegate;
import java.util.Objects;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient;
-public class DmaapClientTestImpl implements DmaapClient {
+public class KafkaClientTestImpl implements KafkaClient {
private String pnfCorrelationId;
private Runnable informConsumer;
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java
index 7ec05fda04..6422e5f88e 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java
@@ -29,7 +29,7 @@ public class RegisterForPnfReadyEventTest {
private DelegateExecution delegateExecution;
private ExtractPojosForBB extractPojosForBBMock;
- private DmaapClientTestImpl dmaapClientTest;
+ private KafkaClientTestImpl kafkaClientTest;
private MessageCorrelationBuilder messageCorrelationBuilder;
private ExceptionBuilder exceptionBuilderMock;
private BuildingBlockExecution buildingBlockExecution;
@@ -39,7 +39,7 @@ public class RegisterForPnfReadyEventTest {
@Before
public void init() {
delegateExecution = prepareExecution();
- dmaapClientTest = new DmaapClientTestImpl();
+ kafkaClientTest = new KafkaClientTestImpl();
exceptionBuilderMock = mock(ExceptionBuilder.class);
extractPojosForBBMock = mock(ExtractPojosForBB.class);
buildingBlockExecution = new DelegateExecutionImpl(new HashMap<>());
@@ -47,9 +47,9 @@ public class RegisterForPnfReadyEventTest {
}
@Test
- public void shouldRegisterForDmaapClient() throws BBObjectNotFoundException {
+ public void shouldRegisterForKafkaClient() throws BBObjectNotFoundException {
// given
- testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+ testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
PNF_ENTRY_NOTIFICATION_TIMEOUT);
Pnf pnf = new Pnf();
pnf.setPnfName(PNF_NAME);
@@ -60,13 +60,13 @@ public class RegisterForPnfReadyEventTest {
verify(delegateExecution).setVariable(ExecutionVariableNames.PNF_CORRELATION_ID, PNF_NAME);
verify(delegateExecution).setVariable(ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION,
PNF_ENTRY_NOTIFICATION_TIMEOUT);
- checkIfInformConsumerThreadIsRunProperly(dmaapClientTest);
+ checkIfInformConsumerThreadIsRunProperly(kafkaClientTest);
}
@Test
public void pnfNotFoundInBBexecution_WorkflowExIsThrown() throws BBObjectNotFoundException {
// given
- testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+ testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
PNF_ENTRY_NOTIFICATION_TIMEOUT);
when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF))
.thenThrow(BBObjectNotFoundException.class);
@@ -74,13 +74,13 @@ public class RegisterForPnfReadyEventTest {
testedObject.execute(delegateExecution);
// then
verify(exceptionBuilderMock).buildAndThrowWorkflowException(delegateExecution, 7000,
- "pnf resource not found in buildingBlockExecution while registering to dmaap listener");
+ "pnf resource not found in buildingBlockExecution while registering to kafka listener");
}
@Test
public void pnfNameIsNull_WorkflowExIsThrown() throws BBObjectNotFoundException {
// given
- testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock,
+ testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock,
PNF_ENTRY_NOTIFICATION_TIMEOUT);
when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf());
// when
@@ -92,7 +92,7 @@ public class RegisterForPnfReadyEventTest {
@Test
public void pnfEventNotificationTimeoutNotSet_WorkflowExIsThrown() throws BBObjectNotFoundException {
// given
- testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, null);
+ testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, null);
when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf());
// when
testedObject.execute(delegateExecution);
@@ -101,8 +101,8 @@ public class RegisterForPnfReadyEventTest {
"pnfEntryNotificationTimeout value not defined");
}
- private void checkIfInformConsumerThreadIsRunProperly(DmaapClientTestImpl dmaapClientTest) {
- dmaapClientTest.getInformConsumer().run();
+ private void checkIfInformConsumerThreadIsRunProperly(KafkaClientTestImpl kafkaClientTest) {
+ kafkaClientTest.getInformConsumer().run();
InOrder inOrder = inOrder(messageCorrelationBuilder);
inOrder.verify(messageCorrelationBuilder).processInstanceId(PROCESS_INSTANCE_ID);
inOrder.verify(messageCorrelationBuilder).correlateWithResult();
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java
index f9e4cb4c88..71d60305e4 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java
@@ -20,7 +20,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java
index 546e644fbd..41887e3586 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java
@@ -20,7 +20,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+package org.onap.so.bpmn.infrastructure.pnf.kafka;
import static org.junit.Assert.assertEquals;
@@ -42,13 +42,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread;
import org.onap.so.client.kafka.KafkaConsumerImpl;
import org.springframework.core.env.Environment;
@RunWith(MockitoJUnitRunner.class)
-public class PnfEventReadyDmaapClientTest {
+public class PnfEventReadyKafkaClientTest {
private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
private static final String PNF_CORRELATION_ID = "corrTestId";
private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
@@ -66,9 +66,9 @@ public class PnfEventReadyDmaapClientTest {
@Mock
private Environment env;
- private PnfEventReadyDmaapClient testedObject;
+ private PnfEventReadyKafkaClient testedObject;
- private DmaapTopicListenerThread testedObjectInnerClassThread;
+ private KafkaTopicListenerThread testedObjectInnerClassThread;
private KafkaConsumerImpl kafkaConsumerMock;
private Runnable threadMockToNotifyCamundaFlow;
private ScheduledThreadPoolExecutor executorMock;
@@ -81,10 +81,10 @@ public class PnfEventReadyDmaapClientTest {
when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
- when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
+ when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class)))
.thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
- testedObject = new PnfEventReadyDmaapClient(env);
- testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
+ testedObject = new PnfEventReadyKafkaClient(env);
+ testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread();
kafkaConsumerMock = mock(KafkaConsumerImpl.class);
threadMockToNotifyCamundaFlow = mock(Runnable.class);
executorMock = mock(ScheduledThreadPoolExecutor.class);
@@ -94,7 +94,7 @@ public class PnfEventReadyDmaapClientTest {
/**
* Test run method, where the are following conditions:
* <p>
- * - DmaapThreadListener is running, flag is set to true
+ * - KafkaThreadListener is running, flag is set to true
* <p>
* - map is filled with one entry with the key that we get from response
* <p>
@@ -130,7 +130,7 @@ public class PnfEventReadyDmaapClientTest {
/**
* Test run method, where the are following conditions:
* <p>
- * - DmaapThreadListener is running, flag is set to true
+ * - KafkaThreadListener is running, flag is set to true
* <p>
* - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
* response. run method should not do anything with the map not run any thread to notify camunda process
@@ -147,7 +147,7 @@ public class PnfEventReadyDmaapClientTest {
/**
* Test run method, where the are following conditions:
* <p>
- * - DmaapThreadListener is running, flag is set to true
+ * - KafkaThreadListener is running, flag is set to true
* <p>
* - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
* method should not do anything with the map and not run any thread to notify camunda process
@@ -182,7 +182,7 @@ public class PnfEventReadyDmaapClientTest {
pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
- Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
+ Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning");
threadRunFlag.setAccessible(true);
threadRunFlag.set(testedObject, true);
threadRunFlag.setAccessible(false);
diff --git a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java
index 761219c6be..a44093298d 100644
--- a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java
+++ b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java
@@ -82,6 +82,7 @@ public class PnfEBBLoader {
aaiResourceIds.add(new Pair<>(WorkflowType.PNF, pnf.getPnfId()));
Resource pnfResource = new Resource(WorkflowType.PNF, pnf.getPnfId(), false, serviceResource);
org.onap.aai.domain.yang.Pnf aaiPnf = bbInputSetupUtils.getAAIPnf(pnf.getPnfName());
+ pnfResource.setInstanceName(pnf.getPnfName());
pnfResource.setModelCustomizationId(aaiPnf.getModelCustomizationId());
pnfResource.setModelVersionId(aaiPnf.getModelVersionId());
resourceList.add(pnfResource);