aboutsummaryrefslogtreecommitdiffstats
path: root/bpmn/so-bpmn-infrastructure-common/src/main
diff options
context:
space:
mode:
authorLukasz Muszkieta <lukasz.muszkieta@nokia.com>2019-11-26 17:08:00 +0100
committerLukasz Muszkieta <lukasz.muszkieta@nokia.com>2019-11-26 18:31:11 +0100
commit3d55c357d5263c8b81e848b0c1efae777e3f7f9e (patch)
tree248a07125f7e3799f27d2bda361a530c52608a00 /bpmn/so-bpmn-infrastructure-common/src/main
parentf3aade51a88b6708233ba486bb578ad436909796 (diff)
restoring the pnf dmaap client functionality to the proper working version
Issue-ID: SO-2339 Change-Id: I2a12517fd7b37d3260058be6c5c27865e207b861 Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
Diffstat (limited to 'bpmn/so-bpmn-infrastructure-common/src/main')
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java40
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java4
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java78
3 files changed, 19 insertions, 103 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
index a55f32aaaa..5cbd530a93 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
@@ -3,7 +3,6 @@
* ONAP - SO
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2019 Nokia.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,59 +20,24 @@
package org.onap.so.bpmn.infrastructure.pnf.delegate;
-import java.util.Map;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.common.recipe.ResourceInput;
-import org.onap.so.bpmn.common.resource.ResourceRequestBuilder;
import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.HashMap;
-import java.util.Optional;
@Component
public class InformDmaapClient implements JavaDelegate {
- private static final Logger LOGGER = LoggerFactory.getLogger(InformDmaapClient.class);
private DmaapClient dmaapClient;
@Override
public void execute(DelegateExecution execution) {
String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
- String processBusinessKey = execution.getProcessBusinessKey();
- dmaapClient.registerForUpdate(pnfCorrelationId,
- () -> runtimeService.createMessageCorrelation("WorkflowMessage")
- .processInstanceBusinessKey(processBusinessKey).correlateWithResult(),
- createUpdateInfoMap(execution));
- }
-
- private Map<String, String> createUpdateInfoMap(DelegateExecution execution) {
- Map<String, String> updateInfoMap = new HashMap<>();
- updateInfoMap.put("pnfCorrelationId",
- (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID));
- getResourceInput(execution).ifPresent(resourceInput -> {
- updateInfoMap.put("globalSubscriberID", resourceInput.getGlobalSubscriberId());
- updateInfoMap.put("serviceType", resourceInput.getServiceType());
- updateInfoMap.put("serviceInstanceId", resourceInput.getServiceInstanceId());
- });
- return updateInfoMap;
- }
-
- private Optional<ResourceInput> getResourceInput(DelegateExecution execution) {
- ResourceInput resourceInput = null;
- if (execution.getVariable("resourceInput") != null) {
- resourceInput = ResourceRequestBuilder.getJsonObject((String) execution.getVariable("resourceInput"),
- ResourceInput.class);
- } else {
- LOGGER.warn("resourceInput value is null for correlation id: {}",
- execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID));
- }
- return Optional.ofNullable(resourceInput);
+ dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
+ .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
}
@Autowired
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
index bafb749e15..fd7eb153b6 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
@@ -21,11 +21,9 @@
package org.onap.so.bpmn.infrastructure.pnf.dmaap;
-import java.util.Map;
-
public interface DmaapClient {
- void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, Map<String, String> updateInfo);
+ void registerForUpdate(String pnfCorrelationId, Runnable informConsumer);
Runnable unregister(String pnfCorrelationId);
}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index bd1a45c64d..a2c73ca639 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -23,14 +23,14 @@
package org.onap.so.bpmn.infrastructure.pnf.dmaap;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.UriBuilder;
import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
@@ -40,11 +40,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
-import org.onap.so.client.aai.entities.uri.AAIResourceUri;
-import org.onap.so.client.aai.entities.uri.AAIUriFactory;
-import org.onap.so.client.aai.AAIResourcesClient;
-import org.onap.so.client.aai.AAIObjectType;
-import static org.onap.so.bpmn.infrastructure.pnf.dmaap.JsonUtilForPnfCorrelationId.*;
@Component
public class PnfEventReadyDmaapClient implements DmaapClient {
@@ -57,7 +52,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
private volatile boolean dmaapThreadListenerIsRunning;
- private volatile List<Map<String, String>> listOfUpdateInfoMap;
@Autowired
public PnfEventReadyDmaapClient(Environment env) {
@@ -70,16 +64,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
.port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName"))
.path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
.build());
- listOfUpdateInfoMap = new ArrayList<>();
}
@Override
- public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer,
- Map<String, String> updateInfo) {
+ public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
- synchronized (listOfUpdateInfoMap) {
- listOfUpdateInfoMap.add(updateInfo);
- }
pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer);
if (!dmaapThreadListenerIsRunning) {
startDmaapThreadListener();
@@ -90,16 +79,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
public synchronized Runnable unregister(String pnfCorrelationId) {
logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId);
- synchronized (listOfUpdateInfoMap) {
- for (int i = listOfUpdateInfoMap.size() - 1; i >= 0; i--) {
- if (!listOfUpdateInfoMap.get(i).containsKey("pnfCorrelationId"))
- continue;
- String id = listOfUpdateInfoMap.get(i).get("pnfCorrelationId");
- if (id != pnfCorrelationId)
- continue;
- listOfUpdateInfoMap.remove(i);
- }
- }
if (pnfCorrelationIdToThreadMap.isEmpty()) {
stopDmaapThreadListener();
}
@@ -132,12 +111,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
try {
logger.debug("dmaap listener starts listening pnf ready dmaap topic");
HttpResponse response = httpClient.execute(getRequest);
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- String responseString = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
- List<String> idList = parseJsonToGelAllPnfCorrelationId(responseString);
- idList.stream().findFirst().ifPresent(id -> registerClientResponse(id, responseString));
- idList.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
- }
+ getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound);
} catch (IOException e) {
logger.error("Exception caught during sending rest request to dmaap for listening event topic", e);
} finally {
@@ -145,6 +119,16 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
}
}
+ private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException {
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (responseString != null) {
+ return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString);
+ }
+ }
+ return Collections.emptyList();
+ }
+
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
Runnable runnable = unregister(pnfCorrelationId);
if (runnable != null) {
@@ -152,36 +136,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
runnable.run();
}
}
-
- private void registerClientResponse(String pnfCorrelationId, String response) {
-
- String customerId = null;
- String serviceType = null;
- String serId = null;
- synchronized (listOfUpdateInfoMap) {
- for (Map<String, String> map : listOfUpdateInfoMap) {
- if (!map.containsKey("pnfCorrelationId"))
- continue;
- if (pnfCorrelationId != map.get("pnfCorrelationId"))
- continue;
- if (!map.containsKey("globalSubscriberID"))
- continue;
- if (!map.containsKey("serviceType"))
- continue;
- if (!map.containsKey("serviceInstanceId"))
- continue;
- customerId = map.get("pnfCorrelationId");
- serviceType = map.get("serviceType");
- serId = map.get("serviceInstanceId");
- }
- }
- if (customerId == null || serviceType == null || serId == null)
- return;
- AAIResourcesClient client = new AAIResourcesClient();
- AAIResourceUri uri = AAIUriFactory.createResourceUri(AAIObjectType.SERVICE_INSTANCE_METADATA, customerId,
- serviceType, serId);
- client.update(uri, response);
- }
-
}
+
}