diff options
author | Lukasz Muszkieta <lukasz.muszkieta@nokia.com> | 2019-11-26 17:08:00 +0100 |
---|---|---|
committer | Lukasz Muszkieta <lukasz.muszkieta@nokia.com> | 2019-11-26 18:31:11 +0100 |
commit | 3d55c357d5263c8b81e848b0c1efae777e3f7f9e (patch) | |
tree | 248a07125f7e3799f27d2bda361a530c52608a00 /bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap | |
parent | f3aade51a88b6708233ba486bb578ad436909796 (diff) |
restoring the pnf dmaap client functionality to the proper working version
Issue-ID: SO-2339
Change-Id: I2a12517fd7b37d3260058be6c5c27865e207b861
Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
Diffstat (limited to 'bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap')
3 files changed, 19 insertions, 103 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java index a55f32aaaa..5cbd530a93 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java @@ -3,7 +3,6 @@ * ONAP - SO * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2019 Nokia. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,59 +20,24 @@ package org.onap.so.bpmn.infrastructure.pnf.delegate; -import java.util.Map; import org.camunda.bpm.engine.RuntimeService; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.onap.so.bpmn.common.recipe.ResourceInput; -import org.onap.so.bpmn.common.resource.ResourceRequestBuilder; import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Optional; @Component public class InformDmaapClient implements JavaDelegate { - private static final Logger LOGGER = LoggerFactory.getLogger(InformDmaapClient.class); private DmaapClient dmaapClient; @Override public void execute(DelegateExecution execution) { String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID); RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService(); - String processBusinessKey = execution.getProcessBusinessKey(); - dmaapClient.registerForUpdate(pnfCorrelationId, - () -> runtimeService.createMessageCorrelation("WorkflowMessage") - .processInstanceBusinessKey(processBusinessKey).correlateWithResult(), - createUpdateInfoMap(execution)); - } - - private Map<String, String> createUpdateInfoMap(DelegateExecution execution) { - Map<String, String> updateInfoMap = new HashMap<>(); - updateInfoMap.put("pnfCorrelationId", - (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID)); - getResourceInput(execution).ifPresent(resourceInput -> { - updateInfoMap.put("globalSubscriberID", resourceInput.getGlobalSubscriberId()); - updateInfoMap.put("serviceType", resourceInput.getServiceType()); - updateInfoMap.put("serviceInstanceId", resourceInput.getServiceInstanceId()); - }); - return updateInfoMap; - } - - private Optional<ResourceInput> getResourceInput(DelegateExecution execution) { - ResourceInput resourceInput = null; - if (execution.getVariable("resourceInput") != null) { - resourceInput = ResourceRequestBuilder.getJsonObject((String) execution.getVariable("resourceInput"), - ResourceInput.class); - } else { - LOGGER.warn("resourceInput value is null for correlation id: {}", - execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID)); - } - return Optional.ofNullable(resourceInput); + dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage") + .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult()); } @Autowired diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java index bafb749e15..fd7eb153b6 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/DmaapClient.java @@ -21,11 +21,9 @@ package org.onap.so.bpmn.infrastructure.pnf.dmaap; -import java.util.Map; - public interface DmaapClient { - void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, Map<String, String> updateInfo); + void registerForUpdate(String pnfCorrelationId, Runnable informConsumer); Runnable unregister(String pnfCorrelationId); } diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index bd1a45c64d..a2c73ca639 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -23,14 +23,14 @@ package org.onap.so.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.ws.rs.core.UriBuilder; import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; @@ -40,11 +40,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; -import org.onap.so.client.aai.entities.uri.AAIResourceUri; -import org.onap.so.client.aai.entities.uri.AAIUriFactory; -import org.onap.so.client.aai.AAIResourcesClient; -import org.onap.so.client.aai.AAIObjectType; -import static org.onap.so.bpmn.infrastructure.pnf.dmaap.JsonUtilForPnfCorrelationId.*; @Component public class PnfEventReadyDmaapClient implements DmaapClient { @@ -57,7 +52,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private int topicListenerDelayInSeconds; private volatile ScheduledThreadPoolExecutor executor; private volatile boolean dmaapThreadListenerIsRunning; - private volatile List<Map<String, String>> listOfUpdateInfoMap; @Autowired public PnfEventReadyDmaapClient(Environment env) { @@ -70,16 +64,11 @@ public class PnfEventReadyDmaapClient implements DmaapClient { .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(env.getProperty("pnf.dmaap.topicName")) .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId")) .build()); - listOfUpdateInfoMap = new ArrayList<>(); } @Override - public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer, - Map<String, String> updateInfo) { + public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) { logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); - synchronized (listOfUpdateInfoMap) { - listOfUpdateInfoMap.add(updateInfo); - } pnfCorrelationIdToThreadMap.put(pnfCorrelationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); @@ -90,16 +79,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public synchronized Runnable unregister(String pnfCorrelationId) { logger.debug("unregistering from pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId); Runnable runnable = pnfCorrelationIdToThreadMap.remove(pnfCorrelationId); - synchronized (listOfUpdateInfoMap) { - for (int i = listOfUpdateInfoMap.size() - 1; i >= 0; i--) { - if (!listOfUpdateInfoMap.get(i).containsKey("pnfCorrelationId")) - continue; - String id = listOfUpdateInfoMap.get(i).get("pnfCorrelationId"); - if (id != pnfCorrelationId) - continue; - listOfUpdateInfoMap.remove(i); - } - } if (pnfCorrelationIdToThreadMap.isEmpty()) { stopDmaapThreadListener(); } @@ -132,12 +111,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { try { logger.debug("dmaap listener starts listening pnf ready dmaap topic"); HttpResponse response = httpClient.execute(getRequest); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - String responseString = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - List<String> idList = parseJsonToGelAllPnfCorrelationId(responseString); - idList.stream().findFirst().ifPresent(id -> registerClientResponse(id, responseString)); - idList.forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); - } + getPnfCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfPnfCorrelationIdFound); } catch (IOException e) { logger.error("Exception caught during sending rest request to dmaap for listening event topic", e); } finally { @@ -145,6 +119,16 @@ public class PnfEventReadyDmaapClient implements DmaapClient { } } + private List<String> getPnfCorrelationIdListFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonUtilForPnfCorrelationId.parseJsonToGelAllPnfCorrelationId(responseString); + } + } + return Collections.emptyList(); + } + private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) { Runnable runnable = unregister(pnfCorrelationId); if (runnable != null) { @@ -152,36 +136,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient { runnable.run(); } } - - private void registerClientResponse(String pnfCorrelationId, String response) { - - String customerId = null; - String serviceType = null; - String serId = null; - synchronized (listOfUpdateInfoMap) { - for (Map<String, String> map : listOfUpdateInfoMap) { - if (!map.containsKey("pnfCorrelationId")) - continue; - if (pnfCorrelationId != map.get("pnfCorrelationId")) - continue; - if (!map.containsKey("globalSubscriberID")) - continue; - if (!map.containsKey("serviceType")) - continue; - if (!map.containsKey("serviceInstanceId")) - continue; - customerId = map.get("pnfCorrelationId"); - serviceType = map.get("serviceType"); - serId = map.get("serviceInstanceId"); - } - } - if (customerId == null || serviceType == null || serId == null) - return; - AAIResourcesClient client = new AAIResourcesClient(); - AAIResourceUri uri = AAIUriFactory.createResourceUri(AAIObjectType.SERVICE_INSTANCE_METADATA, customerId, - serviceType, serId); - client.update(uri, response); - } - } + } |