diff options
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java')
2 files changed, 89 insertions, 12 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java index 417bb4668e..e2dfedb6e4 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java @@ -42,7 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired; * * Outputs: * - AAI_CONTAINS_INFO_ABOUT_PNF - local Boolean - * - aaiContainsInfoAboutIp - local Boolean (only present if AAI_CONTAINS_INFO_ABOUT_PNF is true) + * - aaiContainsInfoAboutIp - local Boolean */ public class CheckAaiForCorrelationIdDelegate implements JavaDelegate { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index e6019f73f0..6871665ba1 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -22,17 +22,28 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.UriBuilder; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer { +public class PnfEventReadyConsumer implements Runnable, DmaapClient { + + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; private HttpClient httpClient; - private String dmaapHost; private int dmaapPort; private String dmaapProtocol; @@ -40,24 +51,57 @@ public class PnfEventReadyConsumer { private String dmaapTopicName; private String consumerId; private String consumerGroup; + private Map<String, Runnable> pnfCorrelationIdToThreadMap; + private HttpGet getRequest; + private ScheduledExecutorService executor; + private int dmaapClientInitialDelayInSeconds; + private int dmaapClientDelayInSeconds; + private boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + executor = null; + } + + public void init() { + getRequest = new HttpGet(buildURI()); + } + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + @Override + public void registerForUpdate(String correlationId, Runnable informConsumer) { + pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); + if (!dmaapThreadListenerIsRunning) { + startDmaapThreadListener(); + } } - public void notifyWhenPnfReady(String correlationId) - throws IOException { - HttpGet getRequest = new HttpGet(buildURI(consumerGroup, consumerId)); - HttpResponse response = httpClient.execute(getRequest); - checkIfResponseIsAccepted(response, correlationId); + private void startDmaapThreadListener() { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; } - private boolean checkIfResponseIsAccepted(HttpResponse response, String correlationId) { - // TODO parse response if contains proper correlationId - return false; + private void stopDmaapThreadListener() { + if (dmaapThreadListenerIsRunning) { + executor.shutdownNow(); + dmaapThreadListenerIsRunning = false; + executor = null; + } } - private URI buildURI(String consumerGroup, String consumerId) { + private URI buildURI() { return UriBuilder.fromUri(dmaapUriPathPrefix) .scheme(dmaapProtocol) .host(dmaapHost) @@ -65,6 +109,31 @@ public class PnfEventReadyConsumer { .path(consumerGroup).path(consumerId).build(); } + private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + + private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() + .ifPresent(this::informAboutPnfReady); + } + + private void informAboutPnfReady(String correlationId) { + pnfCorrelationIdToThreadMap.get(correlationId).run(); + pnfCorrelationIdToThreadMap.remove(correlationId); + + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + } + public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -93,4 +162,12 @@ public class PnfEventReadyConsumer { this.consumerGroup = consumerGroup; } + public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { + this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; + } + + public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { + this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; + } + } |