From 0720a8a4e336d516ee00c515a392bb48a23404fd Mon Sep 17 00:00:00 2001 From: sushant53 Date: Thu, 29 Feb 2024 11:51:05 +0530 Subject: [SO] Code improvement in bpmn-infra supporting kafka change Code improvement in bpmn-infra supporting kafka change Issue-ID: SO-4122 Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2 Signed-off-by: sushant53 --- .../delegate/CreateAndActivatePnfResourceTest.java | 20 +-- .../pnf/delegate/DmaapClientTestImpl.java | 68 -------- .../pnf/delegate/KafkaClientTestImpl.java | 67 ++++++++ .../pnf/delegate/CancelDmaapSubscription.java | 44 ----- .../pnf/delegate/CancelKafkaSubscription.java | 44 +++++ .../pnf/delegate/InformDmaapClient.java | 47 ----- .../pnf/delegate/InformKafkaClient.java | 47 +++++ .../pnf/delegate/RegisterForPnfReadyEvent.java | 12 +- .../bpmn/infrastructure/pnf/dmaap/DmaapClient.java | 29 ---- .../pnf/dmaap/JsonUtilForPnfCorrelationId.java | 69 -------- .../pnf/dmaap/PnfEventReadyDmaapClient.java | 150 ---------------- .../pnf/kafka/JsonUtilForPnfCorrelationId.java | 69 ++++++++ .../bpmn/infrastructure/pnf/kafka/KafkaClient.java | 29 ++++ .../pnf/kafka/PnfEventReadyKafkaClient.java | 150 ++++++++++++++++ .../pnf/delegate/CancelDmaapSubscriptionTest.java | 51 ------ .../pnf/delegate/CancelKafkaSubscriptionTest.java | 51 ++++++ .../pnf/delegate/DmaapClientTestImpl.java | 60 ------- .../pnf/delegate/InformDmaapClientTest.java | 91 ---------- .../pnf/delegate/InformKafkaClientTest.java | 91 ++++++++++ .../pnf/delegate/KafkaClientTestImpl.java | 60 +++++++ .../pnf/delegate/RegisterForPnfReadyEventTest.java | 22 +-- .../pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java | 79 --------- .../pnf/dmaap/PnfEventReadyDmaapClientTest.java | 191 --------------------- .../pnf/kafka/JsonUtilForPnfCorrelationIdTest.java | 79 +++++++++ .../pnf/kafka/PnfEventReadyKafkaClientTest.java | 191 +++++++++++++++++++++ .../workflow/tasks/ebb/loader/PnfEBBLoader.java | 1 + 26 files changed, 906 insertions(+), 906 deletions(-) delete mode 100644 bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java create mode 100644 bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java index 0c001b1192..32ec69e5fa 100644 --- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java +++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAndActivatePnfResourceTest.java @@ -49,7 +49,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { private PnfManagementTestImpl pnfManagementTest; @Autowired - private DmaapClientTestImpl dmaapClientTestImpl; + private KafkaClientTestImpl kafkaClientTestImpl; @Before public void setup() { @@ -60,7 +60,7 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { } @Test - public void shouldWaitForMessageFromDmaapAndUpdateAaiEntryWhenAaiEntryExists() { + public void shouldWaitForMessageFromKafkaAndUpdateAaiEntryWhenAaiEntryExists() { // given variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITH_ENTRY); ResourceInput ri = getUpdateResInputObj("OLT"); @@ -72,19 +72,19 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { // when ProcessInstance instance = runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables); - assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage"); - dmaapClientTestImpl.sendMessage(); + assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage"); + kafkaClientTestImpl.sendMessage(); // then assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs", - "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformDmaapClient", - "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); + "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "AaiEntryExists", "InformKafkaClient", + "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap()) .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITH_ENTRY)); } @Test - public void shouldCreateAaiEntryWaitForMessageFromDmaapAndUpdateAaiEntryWhenNoAaiEntryExists() { + public void shouldCreateAaiEntryWaitForMessageFromKafkaAndUpdateAaiEntryWhenNoAaiEntryExists() { // given variables.put(PNF_CORRELATION_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY); ResourceInput ri = getUpdateResInputObj("OLT"); @@ -96,13 +96,13 @@ public class CreateAndActivatePnfResourceTest extends BaseIntegrationTest { // when ProcessInstance instance = runtimeService.startProcessInstanceByKey("CreateAndActivatePnfResource", "businessKey", variables); - assertThat(instance).isWaitingAt("WaitForDmaapPnfReadyNotification").isWaitingFor("WorkflowMessage"); - dmaapClientTestImpl.sendMessage(); + assertThat(instance).isWaitingAt("WaitForKafkaPnfReadyNotification").isWaitingFor("WorkflowMessage"); + kafkaClientTestImpl.sendMessage(); // then assertThat(instance).isEnded().hasPassedInOrder("CreateAndActivatePnf_StartEvent", "CheckInputs", "CheckAiiForPnfCorrelationId", "DoesAaiContainInfoAboutPnf", "CreatePnfEntryInAai", "AaiEntryExists", - "InformDmaapClient", "WaitForDmaapPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); + "InformKafkaClient", "WaitForKafkaPnfReadyNotification", "CreateRelationId", "AaiEntryUpdated"); Assertions.assertThat(pnfManagementTest.getCreated()).containsOnlyKeys(PnfManagementTestImpl.ID_WITHOUT_ENTRY); Assertions.assertThat(pnfManagementTest.getServiceAndPnfRelationMap()) .containsOnly(MapEntry.entry(SERVICE_INSTANCE_ID, PnfManagementTestImpl.ID_WITHOUT_ENTRY)); diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java deleted file mode 100644 index 43fbc59b3d..0000000000 --- a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import java.util.Map; -import java.util.Objects; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Component; - -@Component -@Primary -public class DmaapClientTestImpl implements DmaapClient { - - private String pnfCorrelationId; - private Runnable informConsumer; - - @Override - public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - this.pnfCorrelationId = pnfCorrelationId; - this.informConsumer = informConsumer; - } - - @Override - public Runnable unregister(String pnfCorrelationId) { - if (Objects.equals(this.pnfCorrelationId, pnfCorrelationId)) { - this.pnfCorrelationId = null; - Runnable informConsumer = this.informConsumer; - this.informConsumer = null; - return informConsumer; - } - return null; - } - - public String getPnfCorrelationId() { - return pnfCorrelationId; - } - - public Runnable getInformConsumer() { - return informConsumer; - } - - public void sendMessage() { - informConsumer.run(); - } - - public boolean haveRegisteredConsumer() { - return pnfCorrelationId != null; - } -} diff --git a/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java new file mode 100644 index 0000000000..e4c1717752 --- /dev/null +++ b/bpmn/mso-infrastructure-bpmn/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import java.util.Objects; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +@Component +@Primary +public class KafkaClientTestImpl implements KafkaClient { + + private String pnfCorrelationId; + private Runnable informConsumer; + + @Override + public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { + this.pnfCorrelationId = pnfCorrelationId; + this.informConsumer = informConsumer; + } + + @Override + public Runnable unregister(String pnfCorrelationId) { + if (Objects.equals(this.pnfCorrelationId, pnfCorrelationId)) { + this.pnfCorrelationId = null; + Runnable informConsumer = this.informConsumer; + this.informConsumer = null; + return informConsumer; + } + return null; + } + + public String getPnfCorrelationId() { + return pnfCorrelationId; + } + + public Runnable getInformConsumer() { + return informConsumer; + } + + public void sendMessage() { + informConsumer.run(); + } + + public boolean haveRegisteredConsumer() { + return pnfCorrelationId != null; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java deleted file mode 100644 index 439591a295..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java +++ /dev/null @@ -1,44 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class CancelDmaapSubscription implements JavaDelegate { - - private DmaapClient dmaapClient; - - @Override - public void execute(DelegateExecution execution) { - String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - dmaapClient.unregister(pnfCorrelationId); - } - - @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java new file mode 100644 index 0000000000..d0a6e3a59f --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class CancelKafkaSubscription implements JavaDelegate { + + private KafkaClient kafkaClient; + + @Override + public void execute(DelegateExecution execution) { + String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); + kafkaClient.unregister(pnfCorrelationId); + } + + @Autowired + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java deleted file mode 100644 index 5cbd530a93..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ /dev/null @@ -1,47 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import org.camunda.bpm.engine.RuntimeService; -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class InformDmaapClient implements JavaDelegate { - - private DmaapClient dmaapClient; - - @Override - public void execute(DelegateExecution execution) { - String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") - .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); - } - - @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java new file mode 100644 index 0000000000..6506450c52 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class InformKafkaClient implements JavaDelegate { + + private KafkaClient kafkaClient; + + @Override + public void execute(DelegateExecution execution) { + String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); + RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); + kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); + } + + @Autowired + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java index 9e1b5d5a0b..ef5da92e78 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java @@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; import org.onap.so.bpmn.common.BuildingBlockExecution; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf; import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey; import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB; @@ -45,19 +45,19 @@ import org.springframework.stereotype.Component; public class RegisterForPnfReadyEvent implements JavaDelegate { private static final String ERROR_MESSAGE_PNF_NOT_FOUND = - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"; + "pnf resource not found in buildingBlockExecution while registering to kafka listener"; private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class); - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; private ExtractPojosForBB extractPojosForBB; private ExceptionBuilder exceptionBuilder; private String pnfEntryNotificationTimeout; @Autowired - public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB, + public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB, ExceptionBuilder exceptionBuilder, @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) { - this.dmaapClient = dmaapClient; + this.kafkaClient = kafkaClient; this.extractPojosForBB = extractPojosForBB; this.exceptionBuilder = exceptionBuilder; this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout; @@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate { String pnfName = getPnfName(execution); fillExecution(execution, pnfName); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceId(execution.getProcessInstanceId()).correlateWithResult()); } catch (BBObjectNotFoundException e) { LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java deleted file mode 100644 index fd7eb153b6..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2019 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -public interface DmaapClient { - - void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); - - Runnable unregister(String pnfCorrelationId); -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java deleted file mode 100644 index 9cb566f49b..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ /dev/null @@ -1,69 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * Modifications Copyright (c) 2019 Samsung - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Spliterator; - -public final class JsonUtilForPnfCorrelationId { - - private static final String JSON_PNF_CORRELATION_ID_FIELD_NAME = "correlationId"; - - private JsonUtilForPnfCorrelationId() { - throw new IllegalStateException("Utility class"); - } - - static List parseJsonToGelAllPnfCorrelationId(List list) { - if (list == null || list.isEmpty()) { - return Collections.emptyList(); - } - - List newList = new ArrayList<>(); - list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) - .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) - .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); - return newList; - } - - private static Optional handleEscapedCharacters(JsonElement jsonElement) { - if (jsonElement.isJsonObject()) { - return Optional.ofNullable(jsonElement.getAsJsonObject()); - } - return Optional.ofNullable(new JsonParser().parse(jsonElement.getAsString()).getAsJsonObject()); - } - - private static Optional getPnfCorrelationId(JsonObject jsonObject) { - if (jsonObject.has(JSON_PNF_CORRELATION_ID_FIELD_NAME)) { - return Optional.ofNullable(jsonObject.get(JSON_PNF_CORRELATION_ID_FIELD_NAME).getAsString()); - } - return Optional.empty(); - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java deleted file mode 100644 index 44b16dad28..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ /dev/null @@ -1,150 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.onap.so.client.kafka.KafkaConsumerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Component; - -@Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); - private Map pnfCorrelationIdToThreadMap; - private int topicListenerDelayInSeconds; - private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; - private KafkaConsumerImpl consumerForPnfReady; - private KafkaConsumerImpl consumerForPnfUpdate; - private String pnfReadyTopic; - private String pnfUpdateTopic; - private String consumerGroup; - private String consumerId; - private String consumerIdUpdate; - - - - @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { - pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); - executor = null; - try { - consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); - consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); - } catch (Exception e) { - throw new RuntimeException(e); - } - pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); - pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); - consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); - consumerId = env.getProperty("pnf.kafka.consumerId"); - consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); - } - - - @Override - public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); - pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); - } - } - - @Override - public synchronized Runnable unregister(String pnfCorrelationId) { - logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); - Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - consumerForPnfUpdate.close(); - consumerForPnfReady.close(); - stopDmaapThreadListener(); - } - return runnable; - } - - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { - executor = new ScheduledThreadPoolExecutor(1); - executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, - TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; - } - } - - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { - executor.shutdown(); - dmaapThreadListenerIsRunning = false; - executor = null; - } - } - - class DmaapTopicListenerThread implements Runnable { - @Override - public void run() { - try { - List response; - System.out.println(pnfUpdateTopic + " " + consumerGroup); - response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); - if (response.isEmpty()) { - response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); - getPnfCorrelationIdListFromResponse(response) - .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); - } else { - getPnfCorrelationIdListFromResponse(response) - .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); - } - } catch (IOException e) { - logger.error("Exception caught during sending rest request to kafka for listening event topic", e); - } - } - - private List getPnfCorrelationIdListFromResponse(List response) throws IOException { - if (response != null) { - return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); - } - return Collections.emptyList(); - } - - private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { - Runnable runnable = unregister(pnfCorrelationId); - if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); - runnable.run(); - } - } - } -} - diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java new file mode 100644 index 0000000000..3c65cbaad3 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * Modifications Copyright (c) 2019 Samsung + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Spliterator; + +public final class JsonUtilForPnfCorrelationId { + + private static final String JSON_PNF_CORRELATION_ID_FIELD_NAME = "correlationId"; + + private JsonUtilForPnfCorrelationId() { + throw new IllegalStateException("Utility class"); + } + + static List parseJsonToGelAllPnfCorrelationId(List list) { + if (list == null || list.isEmpty()) { + return Collections.emptyList(); + } + + List newList = new ArrayList<>(); + list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) + .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) + .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); + return newList; + } + + private static Optional handleEscapedCharacters(JsonElement jsonElement) { + if (jsonElement.isJsonObject()) { + return Optional.ofNullable(jsonElement.getAsJsonObject()); + } + return Optional.ofNullable(new JsonParser().parse(jsonElement.getAsString()).getAsJsonObject()); + } + + private static Optional getPnfCorrelationId(JsonObject jsonObject) { + if (jsonObject.has(JSON_PNF_CORRELATION_ID_FIELD_NAME)) { + return Optional.ofNullable(jsonObject.get(JSON_PNF_CORRELATION_ID_FIELD_NAME).getAsString()); + } + return Optional.empty(); + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java new file mode 100644 index 0000000000..941c565bca --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +public interface KafkaClient { + + void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); + + Runnable unregister(String pnfCorrelationId); +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java new file mode 100644 index 0000000000..0d3e0e0230 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -0,0 +1,150 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.onap.so.client.kafka.KafkaConsumerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +@Component +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); + private Map pnfCorrelationIdToThreadMap; + private int topicListenerDelayInSeconds; + private volatile ScheduledThreadPoolExecutor executor; + private volatile boolean kafkaThreadListenerIsRunning; + private KafkaConsumerImpl consumerForPnfReady; + private KafkaConsumerImpl consumerForPnfUpdate; + private String pnfReadyTopic; + private String pnfUpdateTopic; + private String consumerGroup; + private String consumerId; + private String consumerIdUpdate; + + + + @Autowired + public PnfEventReadyKafkaClient(Environment env) throws IOException { + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); + executor = null; + try { + consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + } catch (Exception e) { + throw new RuntimeException(e); + } + pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); + pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); + consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); + consumerId = env.getProperty("pnf.kafka.consumerId"); + consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); + } + + + @Override + public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { + logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); + pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); + } + } + + @Override + public synchronized Runnable unregister(String pnfCorrelationId) { + logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); + Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + consumerForPnfUpdate.close(); + consumerForPnfReady.close(); + stopKafkaThreadListener(); + } + return runnable; + } + + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { + executor = new ScheduledThreadPoolExecutor(1); + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, + TimeUnit.SECONDS); + kafkaThreadListenerIsRunning = true; + } + } + + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { + executor.shutdown(); + kafkaThreadListenerIsRunning = false; + executor = null; + } + } + + class KafkaTopicListenerThread implements Runnable { + @Override + public void run() { + try { + List response; + System.out.println(pnfUpdateTopic + " " + consumerGroup); + response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); + if (response.isEmpty()) { + response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } else { + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } + } catch (IOException e) { + logger.error("Exception caught during sending rest request to kafka for listening event topic", e); + } + } + + private List getPnfCorrelationIdListFromResponse(List response) throws IOException { + if (response != null) { + return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); + } + return Collections.emptyList(); + } + + private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { + Runnable runnable = unregister(pnfCorrelationId); + if (runnable != null) { + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + runnable.run(); + } + } + } +} + diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java deleted file mode 100644 index c2e87d57bf..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.junit.Test; - -public class CancelDmaapSubscriptionTest { - - private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId"; - - @Test - public void shouldCancelSubscription() { - // given - CancelDmaapSubscription delegate = new CancelDmaapSubscription(); - DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl(); - delegate.setDmaapClient(dmaapClientTest); - DelegateExecution delegateExecution = mock(DelegateExecution.class); - when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) - .thenReturn(TEST_PNF_CORRELATION_ID); - when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); - dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> { - }); - // when - delegate.execute(delegateExecution); - // then - assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse(); - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java new file mode 100644 index 0000000000..6230dab3fa --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscriptionTest.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.junit.Test; + +public class CancelKafkaSubscriptionTest { + + private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId"; + + @Test + public void shouldCancelSubscription() { + // given + CancelKafkaSubscription delegate = new CancelKafkaSubscription(); + KafkaClientTestImpl kafkaClientTest = new KafkaClientTestImpl(); + delegate.setKafkaClient(kafkaClientTest); + DelegateExecution delegateExecution = mock(DelegateExecution.class); + when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) + .thenReturn(TEST_PNF_CORRELATION_ID); + when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); + kafkaClientTest.registerForUpdate("testPnfCorrelationId", () -> { + }); + // when + delegate.execute(delegateExecution); + // then + assertThat(kafkaClientTest.haveRegisteredConsumer()).isFalse(); + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java deleted file mode 100644 index 0ec0ac8214..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ /dev/null @@ -1,60 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2019 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import java.util.Objects; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; - -public class DmaapClientTestImpl implements DmaapClient { - - private String pnfCorrelationId; - private Runnable informConsumer; - - @Override - public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - this.pnfCorrelationId = pnfCorrelationId; - this.informConsumer = informConsumer; - } - - @Override - public Runnable unregister(String pnfCorrelationId) { - if (Objects.equals(this.pnfCorrelationId, pnfCorrelationId)) { - this.pnfCorrelationId = null; - Runnable informConsumer = this.informConsumer; - this.informConsumer = null; - return informConsumer; - } - return null; - } - - String getPnfCorrelationId() { - return pnfCorrelationId; - } - - Runnable getInformConsumer() { - return informConsumer; - } - - boolean haveRegisteredConsumer() { - return pnfCorrelationId != null; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java deleted file mode 100644 index 94aa1427a4..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; -import org.camunda.bpm.engine.ProcessEngineServices; -import org.camunda.bpm.engine.RuntimeService; -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; - -public class InformDmaapClientTest { - @Before - public void setUp() { - informDmaapClient = new InformDmaapClient(); - dmaapClientTest = new DmaapClientTestImpl(); - informDmaapClient.setDmaapClient(dmaapClientTest); - delegateExecution = mockDelegateExecution(); - } - - private InformDmaapClient informDmaapClient; - - private DmaapClientTestImpl dmaapClientTest; - - private DelegateExecution delegateExecution; - - private MessageCorrelationBuilder messageCorrelationBuilder; - - @Test - public void shouldSendListenerToDmaapClient() { - // when - informDmaapClient.execute(delegateExecution); - // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); - assertThat(dmaapClientTest.getInformConsumer()).isNotNull(); - verifyZeroInteractions(messageCorrelationBuilder); - } - - @Test - public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() { - // when - informDmaapClient.execute(delegateExecution); - dmaapClientTest.getInformConsumer().run(); - // then - assertThat(dmaapClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); - InOrder inOrder = inOrder(messageCorrelationBuilder); - inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey"); - inOrder.verify(messageCorrelationBuilder).correlateWithResult(); - } - - private DelegateExecution mockDelegateExecution() { - DelegateExecution delegateExecution = mock(DelegateExecution.class); - when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) - .thenReturn("testPnfCorrelationId"); - when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); - ProcessEngineServices processEngineServices = mock(ProcessEngineServices.class); - when(delegateExecution.getProcessEngineServices()).thenReturn(processEngineServices); - RuntimeService runtimeService = mock(RuntimeService.class); - when(processEngineServices.getRuntimeService()).thenReturn(runtimeService); - messageCorrelationBuilder = mock(MessageCorrelationBuilder.class); - when(runtimeService.createMessageCorrelation(any())).thenReturn(messageCorrelationBuilder); - when(messageCorrelationBuilder.processInstanceBusinessKey(any())).thenReturn(messageCorrelationBuilder); - return delegateExecution; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java new file mode 100644 index 0000000000..d8a102f2b4 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClientTest.java @@ -0,0 +1,91 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import org.camunda.bpm.engine.ProcessEngineServices; +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +public class InformKafkaClientTest { + @Before + public void setUp() { + informKafkaClient = new InformKafkaClient(); + kafkaClientTest = new KafkaClientTestImpl(); + informKafkaClient.setKafkaClient(kafkaClientTest); + delegateExecution = mockDelegateExecution(); + } + + private InformKafkaClient informKafkaClient; + + private KafkaClientTestImpl kafkaClientTest; + + private DelegateExecution delegateExecution; + + private MessageCorrelationBuilder messageCorrelationBuilder; + + @Test + public void shouldSendListenerToKafkaClient() { + // when + informKafkaClient.execute(delegateExecution); + // then + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + assertThat(kafkaClientTest.getInformConsumer()).isNotNull(); + verifyZeroInteractions(messageCorrelationBuilder); + } + + @Test + public void shouldSendListenerToKafkaClientAndSendMessageToCamunda() { + // when + informKafkaClient.execute(delegateExecution); + kafkaClientTest.getInformConsumer().run(); + // then + assertThat(kafkaClientTest.getPnfCorrelationId()).isEqualTo("testPnfCorrelationId"); + InOrder inOrder = inOrder(messageCorrelationBuilder); + inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey"); + inOrder.verify(messageCorrelationBuilder).correlateWithResult(); + } + + private DelegateExecution mockDelegateExecution() { + DelegateExecution delegateExecution = mock(DelegateExecution.class); + when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID))) + .thenReturn("testPnfCorrelationId"); + when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey"); + ProcessEngineServices processEngineServices = mock(ProcessEngineServices.class); + when(delegateExecution.getProcessEngineServices()).thenReturn(processEngineServices); + RuntimeService runtimeService = mock(RuntimeService.class); + when(processEngineServices.getRuntimeService()).thenReturn(runtimeService); + messageCorrelationBuilder = mock(MessageCorrelationBuilder.class); + when(runtimeService.createMessageCorrelation(any())).thenReturn(messageCorrelationBuilder); + when(messageCorrelationBuilder.processInstanceBusinessKey(any())).thenReturn(messageCorrelationBuilder); + return delegateExecution; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java new file mode 100644 index 0000000000..c374ba40c2 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/KafkaClientTestImpl.java @@ -0,0 +1,60 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import java.util.Objects; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; + +public class KafkaClientTestImpl implements KafkaClient { + + private String pnfCorrelationId; + private Runnable informConsumer; + + @Override + public void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { + this.pnfCorrelationId = pnfCorrelationId; + this.informConsumer = informConsumer; + } + + @Override + public Runnable unregister(String pnfCorrelationId) { + if (Objects.equals(this.pnfCorrelationId, pnfCorrelationId)) { + this.pnfCorrelationId = null; + Runnable informConsumer = this.informConsumer; + this.informConsumer = null; + return informConsumer; + } + return null; + } + + String getPnfCorrelationId() { + return pnfCorrelationId; + } + + Runnable getInformConsumer() { + return informConsumer; + } + + boolean haveRegisteredConsumer() { + return pnfCorrelationId != null; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java index 7ec05fda04..6422e5f88e 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEventTest.java @@ -29,7 +29,7 @@ public class RegisterForPnfReadyEventTest { private DelegateExecution delegateExecution; private ExtractPojosForBB extractPojosForBBMock; - private DmaapClientTestImpl dmaapClientTest; + private KafkaClientTestImpl kafkaClientTest; private MessageCorrelationBuilder messageCorrelationBuilder; private ExceptionBuilder exceptionBuilderMock; private BuildingBlockExecution buildingBlockExecution; @@ -39,7 +39,7 @@ public class RegisterForPnfReadyEventTest { @Before public void init() { delegateExecution = prepareExecution(); - dmaapClientTest = new DmaapClientTestImpl(); + kafkaClientTest = new KafkaClientTestImpl(); exceptionBuilderMock = mock(ExceptionBuilder.class); extractPojosForBBMock = mock(ExtractPojosForBB.class); buildingBlockExecution = new DelegateExecutionImpl(new HashMap<>()); @@ -47,9 +47,9 @@ public class RegisterForPnfReadyEventTest { } @Test - public void shouldRegisterForDmaapClient() throws BBObjectNotFoundException { + public void shouldRegisterForKafkaClient() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); Pnf pnf = new Pnf(); pnf.setPnfName(PNF_NAME); @@ -60,13 +60,13 @@ public class RegisterForPnfReadyEventTest { verify(delegateExecution).setVariable(ExecutionVariableNames.PNF_CORRELATION_ID, PNF_NAME); verify(delegateExecution).setVariable(ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION, PNF_ENTRY_NOTIFICATION_TIMEOUT); - checkIfInformConsumerThreadIsRunProperly(dmaapClientTest); + checkIfInformConsumerThreadIsRunProperly(kafkaClientTest); } @Test public void pnfNotFoundInBBexecution_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)) .thenThrow(BBObjectNotFoundException.class); @@ -74,13 +74,13 @@ public class RegisterForPnfReadyEventTest { testedObject.execute(delegateExecution); // then verify(exceptionBuilderMock).buildAndThrowWorkflowException(delegateExecution, 7000, - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"); + "pnf resource not found in buildingBlockExecution while registering to kafka listener"); } @Test public void pnfNameIsNull_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, PNF_ENTRY_NOTIFICATION_TIMEOUT); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when @@ -92,7 +92,7 @@ public class RegisterForPnfReadyEventTest { @Test public void pnfEventNotificationTimeoutNotSet_WorkflowExIsThrown() throws BBObjectNotFoundException { // given - testedObject = new RegisterForPnfReadyEvent(dmaapClientTest, extractPojosForBBMock, exceptionBuilderMock, null); + testedObject = new RegisterForPnfReadyEvent(kafkaClientTest, extractPojosForBBMock, exceptionBuilderMock, null); when(extractPojosForBBMock.extractByKey(buildingBlockExecution, ResourceKey.PNF)).thenReturn(new Pnf()); // when testedObject.execute(delegateExecution); @@ -101,8 +101,8 @@ public class RegisterForPnfReadyEventTest { "pnfEntryNotificationTimeout value not defined"); } - private void checkIfInformConsumerThreadIsRunProperly(DmaapClientTestImpl dmaapClientTest) { - dmaapClientTest.getInformConsumer().run(); + private void checkIfInformConsumerThreadIsRunProperly(KafkaClientTestImpl kafkaClientTest) { + kafkaClientTest.getInformConsumer().run(); InOrder inOrder = inOrder(messageCorrelationBuilder); inOrder.verify(messageCorrelationBuilder).processInstanceId(PROCESS_INSTANCE_ID); inOrder.verify(messageCorrelationBuilder).correlateWithResult(); diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java deleted file mode 100644 index f9e4cb4c88..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationIdTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; -import java.util.List; -import org.junit.Test; - -public class JsonUtilForPnfCorrelationIdTest { - private static final List LIST_EXAMPLE_WITH_PNF_CORRELATION_ID = new ArrayList<>(); - private static final List LIST_WITH_ONE_PNF_CORRELATION_ID = new ArrayList<>(); - private static final List LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = new ArrayList<>(); - private static final List LIST_WITH_NO_PNF_CORRELATION_ID = new ArrayList<>(); - - static { - LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest1\",\"key1\":\"value1\"}"); - LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}"); - LIST_WITH_ONE_PNF_CORRELATION_ID.add("{\"correlationId\":\"corrTest3\"}"); - LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\""); - LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest5\\\"}\""); - LIST_WITH_NO_PNF_CORRELATION_ID.add("{\"key1\":\"value1\"}"); - } - - @Test - public void parseJsonSuccessful() { - List expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_EXAMPLE_WITH_PNF_CORRELATION_ID); - assertThat(expectedResult).containsExactly("corrTest1", "corrTest2"); - - List expectedResult2 = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_ONE_PNF_CORRELATION_ID); - assertThat(expectedResult2).containsExactly("corrTest3"); - } - - @Test - public void parseJsonWithEscapeCharacters_Successful() { - List expectedResult = JsonUtilForPnfCorrelationId - .parseJsonToGelAllPnfCorrelationId(LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); - assertThat(expectedResult).containsExactly("corrTest4", "corrTest5"); - } - - @Test - public void parseJson_emptyListReturnedWhenNothingFound() { - List expectedResult = - JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_NO_PNF_CORRELATION_ID); - assertThat(expectedResult).isEmpty(); - } - - @Test - public void shouldReturnEmptyListWhenInputIsNull() { - assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); - } - - @Test - public void shouldReturnEmptyListWhenInputIsEmpty() { - assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java deleted file mode 100644 index 546e644fbd..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; -import org.onap.so.client.kafka.KafkaConsumerImpl; -import org.springframework.core.env.Environment; - - -@RunWith(MockitoJUnitRunner.class) -public class PnfEventReadyDmaapClientTest { - private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; - private static final String PNF_CORRELATION_ID = "corrTestId"; - private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; - private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = - {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}", - "{\"correlationId\": \"corr\",\"value\":\"value2\"}"}; - - private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}"; - private static final String TOPIC_NAME = "unauthenticated.PNF_READY"; - private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE"; - private static final String CONSUMER_ID = "so-bpmn-infra-pnfready"; - private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate"; - private static final String CONSUMER_GROUP = "so-consumer"; - private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5; - - @Mock - private Environment env; - private PnfEventReadyDmaapClient testedObject; - - private DmaapTopicListenerThread testedObjectInnerClassThread; - private KafkaConsumerImpl kafkaConsumerMock; - private Runnable threadMockToNotifyCamundaFlow; - private ScheduledThreadPoolExecutor executorMock; - - @Before - public void init() throws NoSuchFieldException, IllegalAccessException, IOException { - when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS); - when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); - when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); - when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); - when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); - when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); - when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) - .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); - testedObject = new PnfEventReadyDmaapClient(env); - testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); - kafkaConsumerMock = mock(KafkaConsumerImpl.class); - threadMockToNotifyCamundaFlow = mock(Runnable.class); - executorMock = mock(ScheduledThreadPoolExecutor.class); - setPrivateField(); - } - - /** - * Test run method, where the are following conditions: - *

- * - DmaapThreadListener is running, flag is set to true - *

- * - map is filled with one entry with the key that we get from response - *

- * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty) - * and shutdown the executor because of empty map - */ - @Test - public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException { - when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) - .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), - JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); - testedObjectInnerClassThread.run(); - verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE); - verify(threadMockToNotifyCamundaFlow).run(); - verify(executorMock).shutdown(); - } - - - @Test - public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException { - when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) - .thenReturn(Collections.emptyList()) - .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), - JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); - testedObjectInnerClassThread.run(); - verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID); - - verify(threadMockToNotifyCamundaFlow).run(); - verify(executorMock).shutdown(); - } - - - /** - * Test run method, where the are following conditions: - *

- * - DmaapThreadListener is running, flag is set to true - *

- * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http - * response. run method should not do anything with the map not run any thread to notify camunda process - */ - @Test - public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException { - when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP), - JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); - testedObjectInnerClassThread.run(); - verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); - } - - /** - * Test run method, where the are following conditions: - *

- * - DmaapThreadListener is running, flag is set to true - *

- * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run - * method should not do anything with the map and not run any thread to notify camunda process - */ - @Test - public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException { - when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) - .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); - testedObjectInnerClassThread.run(); - verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); - } - - private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { - Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady"); - consumerForPnfReadyField.setAccessible(true); - consumerForPnfReadyField.set(testedObject, kafkaConsumerMock); - consumerForPnfReadyField.setAccessible(false); - - Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate"); - consumerForPnfUpdateField.setAccessible(true); - consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock); - consumerForPnfUpdateField.setAccessible(false); - - Field executorField = testedObject.getClass().getDeclaredField("executor"); - executorField.setAccessible(true); - executorField.set(testedObject, executorMock); - executorField.setAccessible(false); - - Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap"); - pnfCorrelationToThreadMapField.setAccessible(true); - Map pnfCorrelationToThreadMap = new ConcurrentHashMap<>(); - pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow); - pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap); - - Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning"); - threadRunFlag.setAccessible(true); - threadRunFlag.set(testedObject, true); - threadRunFlag.setAccessible(false); - } - -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java new file mode 100644 index 0000000000..71d60305e4 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationIdTest.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; + +public class JsonUtilForPnfCorrelationIdTest { + private static final List LIST_EXAMPLE_WITH_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List LIST_WITH_ONE_PNF_CORRELATION_ID = new ArrayList<>(); + private static final List LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS = new ArrayList<>(); + private static final List LIST_WITH_NO_PNF_CORRELATION_ID = new ArrayList<>(); + + static { + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest1\",\"key1\":\"value1\"}"); + LIST_EXAMPLE_WITH_PNF_CORRELATION_ID.add("{\"correlationId\": \"corrTest2\",\"key2\":\"value2\"}"); + LIST_WITH_ONE_PNF_CORRELATION_ID.add("{\"correlationId\":\"corrTest3\"}"); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest4\\\"}\""); + LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS.add("\"{\\\"correlationId\\\":\\\"corrTest5\\\"}\""); + LIST_WITH_NO_PNF_CORRELATION_ID.add("{\"key1\":\"value1\"}"); + } + + @Test + public void parseJsonSuccessful() { + List expectedResult = + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_EXAMPLE_WITH_PNF_CORRELATION_ID); + assertThat(expectedResult).containsExactly("corrTest1", "corrTest2"); + + List expectedResult2 = + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_ONE_PNF_CORRELATION_ID); + assertThat(expectedResult2).containsExactly("corrTest3"); + } + + @Test + public void parseJsonWithEscapeCharacters_Successful() { + List expectedResult = JsonUtilForPnfCorrelationId + .parseJsonToGelAllPnfCorrelationId(LIST_WITH_TWO_PNF_CORRELATION_ID_AND_ESCAPED_CHARACTERS); + assertThat(expectedResult).containsExactly("corrTest4", "corrTest5"); + } + + @Test + public void parseJson_emptyListReturnedWhenNothingFound() { + List expectedResult = + JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(LIST_WITH_NO_PNF_CORRELATION_ID); + assertThat(expectedResult).isEmpty(); + } + + @Test + public void shouldReturnEmptyListWhenInputIsNull() { + assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); + } + + @Test + public void shouldReturnEmptyListWhenInputIsEmpty() { + assertThat(JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(null)).isEmpty(); + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java new file mode 100644 index 0000000000..41887e3586 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClientTest.java @@ -0,0 +1,191 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread; +import org.onap.so.client.kafka.KafkaConsumerImpl; +import org.springframework.core.env.Environment; + + +@RunWith(MockitoJUnitRunner.class) +public class PnfEventReadyKafkaClientTest { + private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String PNF_CORRELATION_ID = "corrTestId"; + private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; + private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = + {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}", + "{\"correlationId\": \"corr\",\"value\":\"value2\"}"}; + + private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}"; + private static final String TOPIC_NAME = "unauthenticated.PNF_READY"; + private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE"; + private static final String CONSUMER_ID = "so-bpmn-infra-pnfready"; + private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate"; + private static final String CONSUMER_GROUP = "so-consumer"; + private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5; + + @Mock + private Environment env; + private PnfEventReadyKafkaClient testedObject; + + private KafkaTopicListenerThread testedObjectInnerClassThread; + private KafkaConsumerImpl kafkaConsumerMock; + private Runnable threadMockToNotifyCamundaFlow; + private ScheduledThreadPoolExecutor executorMock; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException, IOException { + when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS); + when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); + when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); + when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); + when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class))) + .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); + testedObject = new PnfEventReadyKafkaClient(env); + testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread(); + kafkaConsumerMock = mock(KafkaConsumerImpl.class); + threadMockToNotifyCamundaFlow = mock(Runnable.class); + executorMock = mock(ScheduledThreadPoolExecutor.class); + setPrivateField(); + } + + /** + * Test run method, where the are following conditions: + *

+ * - KafkaThreadListener is running, flag is set to true + *

+ * - map is filled with one entry with the key that we get from response + *

+ * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty) + * and shutdown the executor because of empty map + */ + @Test + public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException { + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); + testedObjectInnerClassThread.run(); + verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE); + verify(threadMockToNotifyCamundaFlow).run(); + verify(executorMock).shutdown(); + } + + + @Test + public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException { + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Collections.emptyList()) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); + testedObjectInnerClassThread.run(); + verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID); + + verify(threadMockToNotifyCamundaFlow).run(); + verify(executorMock).shutdown(); + } + + + /** + * Test run method, where the are following conditions: + *

+ * - KafkaThreadListener is running, flag is set to true + *

+ * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http + * response. run method should not do anything with the map not run any thread to notify camunda process + */ + @Test + public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException { + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList( + String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); + testedObjectInnerClassThread.run(); + verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); + } + + /** + * Test run method, where the are following conditions: + *

+ * - KafkaThreadListener is running, flag is set to true + *

+ * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run + * method should not do anything with the map and not run any thread to notify camunda process + */ + @Test + public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException { + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); + testedObjectInnerClassThread.run(); + verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); + } + + private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { + Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady"); + consumerForPnfReadyField.setAccessible(true); + consumerForPnfReadyField.set(testedObject, kafkaConsumerMock); + consumerForPnfReadyField.setAccessible(false); + + Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate"); + consumerForPnfUpdateField.setAccessible(true); + consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock); + consumerForPnfUpdateField.setAccessible(false); + + Field executorField = testedObject.getClass().getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(testedObject, executorMock); + executorField.setAccessible(false); + + Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap"); + pnfCorrelationToThreadMapField.setAccessible(true); + Map pnfCorrelationToThreadMap = new ConcurrentHashMap<>(); + pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow); + pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap); + + Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning"); + threadRunFlag.setAccessible(true); + threadRunFlag.set(testedObject, true); + threadRunFlag.setAccessible(false); + } + +} diff --git a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java index 761219c6be..a44093298d 100644 --- a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java +++ b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/workflow/tasks/ebb/loader/PnfEBBLoader.java @@ -82,6 +82,7 @@ public class PnfEBBLoader { aaiResourceIds.add(new Pair<>(WorkflowType.PNF, pnf.getPnfId())); Resource pnfResource = new Resource(WorkflowType.PNF, pnf.getPnfId(), false, serviceResource); org.onap.aai.domain.yang.Pnf aaiPnf = bbInputSetupUtils.getAAIPnf(pnf.getPnfName()); + pnfResource.setInstanceName(pnf.getPnfName()); pnfResource.setModelCustomizationId(aaiPnf.getModelCustomizationId()); pnfResource.setModelVersionId(aaiPnf.getModelVersionId()); resourceList.add(pnfResource); -- cgit 1.2.3-korg