From 0720a8a4e336d516ee00c515a392bb48a23404fd Mon Sep 17 00:00:00 2001 From: sushant53 Date: Thu, 29 Feb 2024 11:51:05 +0530 Subject: [SO] Code improvement in bpmn-infra supporting kafka change Code improvement in bpmn-infra supporting kafka change Issue-ID: SO-4122 Change-Id: I3924418d16f8f6d9270278f1894e224a216d1cf2 Signed-off-by: sushant53 --- .../pnf/delegate/CancelDmaapSubscription.java | 44 ------ .../pnf/delegate/CancelKafkaSubscription.java | 44 ++++++ .../pnf/delegate/InformDmaapClient.java | 47 ------- .../pnf/delegate/InformKafkaClient.java | 47 +++++++ .../pnf/delegate/RegisterForPnfReadyEvent.java | 12 +- .../bpmn/infrastructure/pnf/dmaap/DmaapClient.java | 29 ---- .../pnf/dmaap/JsonUtilForPnfCorrelationId.java | 69 ---------- .../pnf/dmaap/PnfEventReadyDmaapClient.java | 150 --------------------- .../pnf/kafka/JsonUtilForPnfCorrelationId.java | 69 ++++++++++ .../bpmn/infrastructure/pnf/kafka/KafkaClient.java | 29 ++++ .../pnf/kafka/PnfEventReadyKafkaClient.java | 150 +++++++++++++++++++++ 11 files changed, 345 insertions(+), 345 deletions(-) delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java delete mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java create mode 100644 bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java (limited to 'bpmn/so-bpmn-infrastructure-common/src/main/java/org') diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java deleted file mode 100644 index 439591a295..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java +++ /dev/null @@ -1,44 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class CancelDmaapSubscription implements JavaDelegate { - - private DmaapClient dmaapClient; - - @Override - public void execute(DelegateExecution execution) { - String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - dmaapClient.unregister(pnfCorrelationId); - } - - @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java new file mode 100644 index 0000000000..d0a6e3a59f --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelKafkaSubscription.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class CancelKafkaSubscription implements JavaDelegate { + + private KafkaClient kafkaClient; + + @Override + public void execute(DelegateExecution execution) { + String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); + kafkaClient.unregister(pnfCorrelationId); + } + + @Autowired + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java deleted file mode 100644 index 5cbd530a93..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ /dev/null @@ -1,47 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.delegate; - -import org.camunda.bpm.engine.RuntimeService; -import org.camunda.bpm.engine.delegate.DelegateExecution; -import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class InformDmaapClient implements JavaDelegate { - - private DmaapClient dmaapClient; - - @Override - public void execute(DelegateExecution execution) { - String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); - RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") - .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); - } - - @Autowired - public void setDmaapClient(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java new file mode 100644 index 0000000000..6506450c52 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformKafkaClient.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.delegate; + +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class InformKafkaClient implements JavaDelegate { + + private KafkaClient kafkaClient; + + @Override + public void execute(DelegateExecution execution) { + String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); + RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); + kafkaClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); + } + + @Autowired + public void setKafkaClient(KafkaClient kafkaClient) { + this.kafkaClient = kafkaClient; + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java index 9e1b5d5a0b..ef5da92e78 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/RegisterForPnfReadyEvent.java @@ -26,7 +26,7 @@ import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; import org.onap.so.bpmn.common.BuildingBlockExecution; -import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.onap.so.bpmn.infrastructure.pnf.kafka.KafkaClient; import org.onap.so.bpmn.servicedecomposition.bbobjects.Pnf; import org.onap.so.bpmn.servicedecomposition.entities.ResourceKey; import org.onap.so.bpmn.servicedecomposition.tasks.ExtractPojosForBB; @@ -45,19 +45,19 @@ import org.springframework.stereotype.Component; public class RegisterForPnfReadyEvent implements JavaDelegate { private static final String ERROR_MESSAGE_PNF_NOT_FOUND = - "pnf resource not found in buildingBlockExecution while registering to dmaap listener"; + "pnf resource not found in buildingBlockExecution while registering to kafka listener"; private static final Logger LOGGER = LoggerFactory.getLogger(RegisterForPnfReadyEvent.class); - private DmaapClient dmaapClient; + private KafkaClient kafkaClient; private ExtractPojosForBB extractPojosForBB; private ExceptionBuilder exceptionBuilder; private String pnfEntryNotificationTimeout; @Autowired - public RegisterForPnfReadyEvent(DmaapClient dmaapClient, ExtractPojosForBB extractPojosForBB, + public RegisterForPnfReadyEvent(KafkaClient kafkaClient, ExtractPojosForBB extractPojosForBB, ExceptionBuilder exceptionBuilder, @Value("${aai.pnfEntryNotificationTimeout}") String pnfEntryNotificationTimeout) { - this.dmaapClient = dmaapClient; + this.kafkaClient = kafkaClient; this.extractPojosForBB = extractPojosForBB; this.exceptionBuilder = exceptionBuilder; this.pnfEntryNotificationTimeout = pnfEntryNotificationTimeout; @@ -69,7 +69,7 @@ public class RegisterForPnfReadyEvent implements JavaDelegate { String pnfName = getPnfName(execution); fillExecution(execution, pnfName); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - dmaapClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + kafkaClient.registerForUpdate(pnfName, () -> runtimeService.createMessageCorrelation("WorkflowMessage") .processInstanceId(execution.getProcessInstanceId()).correlateWithResult()); } catch (BBObjectNotFoundException e) { LOGGER.error(ERROR_MESSAGE_PNF_NOT_FOUND); diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java deleted file mode 100644 index fd7eb153b6..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ /dev/null @@ -1,29 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2019 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -public interface DmaapClient { - - void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); - - Runnable unregister(String pnfCorrelationId); -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java deleted file mode 100644 index 9cb566f49b..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/JsonUtilForPnfCorrelationId.java +++ /dev/null @@ -1,69 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * Modifications Copyright (c) 2019 Samsung - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Spliterator; - -public final class JsonUtilForPnfCorrelationId { - - private static final String JSON_PNF_CORRELATION_ID_FIELD_NAME = "correlationId"; - - private JsonUtilForPnfCorrelationId() { - throw new IllegalStateException("Utility class"); - } - - static List parseJsonToGelAllPnfCorrelationId(List list) { - if (list == null || list.isEmpty()) { - return Collections.emptyList(); - } - - List newList = new ArrayList<>(); - list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) - .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) - .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); - return newList; - } - - private static Optional handleEscapedCharacters(JsonElement jsonElement) { - if (jsonElement.isJsonObject()) { - return Optional.ofNullable(jsonElement.getAsJsonObject()); - } - return Optional.ofNullable(new JsonParser().parse(jsonElement.getAsString()).getAsJsonObject()); - } - - private static Optional getPnfCorrelationId(JsonObject jsonObject) { - if (jsonObject.has(JSON_PNF_CORRELATION_ID_FIELD_NAME)) { - return Optional.ofNullable(jsonObject.get(JSON_PNF_CORRELATION_ID_FIELD_NAME).getAsString()); - } - return Optional.empty(); - } -} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java deleted file mode 100644 index 44b16dad28..0000000000 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ /dev/null @@ -1,150 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2018 Nokia. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.so.bpmn.infrastructure.pnf.dmaap; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.onap.so.client.kafka.KafkaConsumerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Component; - -@Component -public class PnfEventReadyDmaapClient implements DmaapClient { - private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyDmaapClient.class); - private Map pnfCorrelationIdToThreadMap; - private int topicListenerDelayInSeconds; - private volatile ScheduledThreadPoolExecutor executor; - private volatile boolean dmaapThreadListenerIsRunning; - private KafkaConsumerImpl consumerForPnfReady; - private KafkaConsumerImpl consumerForPnfUpdate; - private String pnfReadyTopic; - private String pnfUpdateTopic; - private String consumerGroup; - private String consumerId; - private String consumerIdUpdate; - - - - @Autowired - public PnfEventReadyDmaapClient(Environment env) throws IOException { - pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class); - executor = null; - try { - consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); - consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); - } catch (Exception e) { - throw new RuntimeException(e); - } - pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); - pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); - consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); - consumerId = env.getProperty("pnf.kafka.consumerId"); - consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); - } - - - @Override - public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { - logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); - pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); - } - } - - @Override - public synchronized Runnable unregister(String pnfCorrelationId) { - logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); - Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - consumerForPnfUpdate.close(); - consumerForPnfReady.close(); - stopDmaapThreadListener(); - } - return runnable; - } - - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { - executor = new ScheduledThreadPoolExecutor(1); - executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, topicListenerDelayInSeconds, - TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; - } - } - - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { - executor.shutdown(); - dmaapThreadListenerIsRunning = false; - executor = null; - } - } - - class DmaapTopicListenerThread implements Runnable { - @Override - public void run() { - try { - List response; - System.out.println(pnfUpdateTopic + " " + consumerGroup); - response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); - if (response.isEmpty()) { - response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); - getPnfCorrelationIdListFromResponse(response) - .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); - } else { - getPnfCorrelationIdListFromResponse(response) - .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); - } - } catch (IOException e) { - logger.error("Exception caught during sending rest request to kafka for listening event topic", e); - } - } - - private List getPnfCorrelationIdListFromResponse(List response) throws IOException { - if (response != null) { - return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); - } - return Collections.emptyList(); - } - - private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { - Runnable runnable = unregister(pnfCorrelationId); - if (runnable != null) { - logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); - runnable.run(); - } - } - } -} - diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java new file mode 100644 index 0000000000..3c65cbaad3 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/JsonUtilForPnfCorrelationId.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * Modifications Copyright (c) 2019 Samsung + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Spliterator; + +public final class JsonUtilForPnfCorrelationId { + + private static final String JSON_PNF_CORRELATION_ID_FIELD_NAME = "correlationId"; + + private JsonUtilForPnfCorrelationId() { + throw new IllegalStateException("Utility class"); + } + + static List parseJsonToGelAllPnfCorrelationId(List list) { + if (list == null || list.isEmpty()) { + return Collections.emptyList(); + } + + List newList = new ArrayList<>(); + list.forEach(je -> handleEscapedCharacters(new JsonParser().parse(je)) + .ifPresent(jsonObject -> getPnfCorrelationId(jsonObject) + .ifPresent(pnfCorrelationId -> newList.add(pnfCorrelationId)))); + return newList; + } + + private static Optional handleEscapedCharacters(JsonElement jsonElement) { + if (jsonElement.isJsonObject()) { + return Optional.ofNullable(jsonElement.getAsJsonObject()); + } + return Optional.ofNullable(new JsonParser().parse(jsonElement.getAsString()).getAsJsonObject()); + } + + private static Optional getPnfCorrelationId(JsonObject jsonObject) { + if (jsonObject.has(JSON_PNF_CORRELATION_ID_FIELD_NAME)) { + return Optional.ofNullable(jsonObject.get(JSON_PNF_CORRELATION_ID_FIELD_NAME).getAsString()); + } + return Optional.empty(); + } +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java new file mode 100644 index 0000000000..941c565bca --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/KafkaClient.java @@ -0,0 +1,29 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +public interface KafkaClient { + + void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); + + Runnable unregister(String pnfCorrelationId); +} diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java new file mode 100644 index 0000000000..0d3e0e0230 --- /dev/null +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/kafka/PnfEventReadyKafkaClient.java @@ -0,0 +1,150 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2018 Nokia. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.so.bpmn.infrastructure.pnf.kafka; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.onap.so.client.kafka.KafkaConsumerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +@Component +public class PnfEventReadyKafkaClient implements KafkaClient { + private static final Logger logger = LoggerFactory.getLogger(PnfEventReadyKafkaClient.class); + private Map pnfCorrelationIdToThreadMap; + private int topicListenerDelayInSeconds; + private volatile ScheduledThreadPoolExecutor executor; + private volatile boolean kafkaThreadListenerIsRunning; + private KafkaConsumerImpl consumerForPnfReady; + private KafkaConsumerImpl consumerForPnfUpdate; + private String pnfReadyTopic; + private String pnfUpdateTopic; + private String consumerGroup; + private String consumerId; + private String consumerIdUpdate; + + + + @Autowired + public PnfEventReadyKafkaClient(Environment env) throws IOException { + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + topicListenerDelayInSeconds = env.getProperty("pnf.kafka.topicListenerDelayInSeconds", Integer.class); + executor = null; + try { + consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers")); + } catch (Exception e) { + throw new RuntimeException(e); + } + pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName"); + pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName"); + consumerGroup = env.getProperty("pnf.kafka.consumerGroup"); + consumerId = env.getProperty("pnf.kafka.consumerId"); + consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate"); + } + + + @Override + public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { + logger.debug("registering for pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); + pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); + if (!kafkaThreadListenerIsRunning) { + startKafkaThreadListener(); + } + } + + @Override + public synchronized Runnable unregister(String pnfCorrelationId) { + logger.debug("unregistering from pnf ready kafka event for pnf correlation id: {}", pnfCorrelationId); + Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + consumerForPnfUpdate.close(); + consumerForPnfReady.close(); + stopKafkaThreadListener(); + } + return runnable; + } + + private synchronized void startKafkaThreadListener() { + if (!kafkaThreadListenerIsRunning) { + executor = new ScheduledThreadPoolExecutor(1); + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + executor.scheduleWithFixedDelay(new KafkaTopicListenerThread(), 0, topicListenerDelayInSeconds, + TimeUnit.SECONDS); + kafkaThreadListenerIsRunning = true; + } + } + + private synchronized void stopKafkaThreadListener() { + if (kafkaThreadListenerIsRunning) { + executor.shutdown(); + kafkaThreadListenerIsRunning = false; + executor = null; + } + } + + class KafkaTopicListenerThread implements Runnable { + @Override + public void run() { + try { + List response; + System.out.println(pnfUpdateTopic + " " + consumerGroup); + response = consumerForPnfUpdate.get(pnfUpdateTopic, consumerGroup, consumerIdUpdate); + if (response.isEmpty()) { + response = consumerForPnfReady.get(pnfReadyTopic, consumerGroup, consumerId); + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } else { + getPnfCorrelationIdListFromResponse(response) + .forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); + } + } catch (IOException e) { + logger.error("Exception caught during sending rest request to kafka for listening event topic", e); + } + } + + private List getPnfCorrelationIdListFromResponse(List response) throws IOException { + if (response != null) { + return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(response); + } + return Collections.emptyList(); + } + + private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { + Runnable runnable = unregister(pnfCorrelationId); + if (runnable != null) { + logger.debug("kafka listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId); + runnable.run(); + } + } + } +} + -- cgit 1.2.3-korg