From b196509debdb43411abfcdc55b3ce87a0a30da6e Mon Sep 17 00:00:00 2001 From: Lukasz Muszkieta Date: Thu, 12 Apr 2018 18:24:12 +0200 Subject: PnfReadyEventConsumer implementation Change-Id: I7252400a3f60ca22ddfa71edb28eaf1d16ccd9b4 Issue-ID: SO-466 Signed-off-by: Lukasz Muszkieta --- .../pnf/dmaap/PnfEventReadyConsumer.java | 99 +++++++++++++++++++--- 1 file changed, 88 insertions(+), 11 deletions(-) (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java') diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index e6019f73f0..6871665ba1 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -22,17 +22,28 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.UriBuilder; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer { +public class PnfEventReadyConsumer implements Runnable, DmaapClient { + + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; private HttpClient httpClient; - private String dmaapHost; private int dmaapPort; private String dmaapProtocol; @@ -40,24 +51,57 @@ public class PnfEventReadyConsumer { private String dmaapTopicName; private String consumerId; private String consumerGroup; + private Map pnfCorrelationIdToThreadMap; + private HttpGet getRequest; + private ScheduledExecutorService executor; + private int dmaapClientInitialDelayInSeconds; + private int dmaapClientDelayInSeconds; + private boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + executor = null; + } + + public void init() { + getRequest = new HttpGet(buildURI()); + } + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + @Override + public void registerForUpdate(String correlationId, Runnable informConsumer) { + pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); + if (!dmaapThreadListenerIsRunning) { + startDmaapThreadListener(); + } } - public void notifyWhenPnfReady(String correlationId) - throws IOException { - HttpGet getRequest = new HttpGet(buildURI(consumerGroup, consumerId)); - HttpResponse response = httpClient.execute(getRequest); - checkIfResponseIsAccepted(response, correlationId); + private void startDmaapThreadListener() { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; } - private boolean checkIfResponseIsAccepted(HttpResponse response, String correlationId) { - // TODO parse response if contains proper correlationId - return false; + private void stopDmaapThreadListener() { + if (dmaapThreadListenerIsRunning) { + executor.shutdownNow(); + dmaapThreadListenerIsRunning = false; + executor = null; + } } - private URI buildURI(String consumerGroup, String consumerId) { + private URI buildURI() { return UriBuilder.fromUri(dmaapUriPathPrefix) .scheme(dmaapProtocol) .host(dmaapHost) @@ -65,6 +109,31 @@ public class PnfEventReadyConsumer { .path(consumerGroup).path(consumerId).build(); } + private Optional getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + + private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() + .ifPresent(this::informAboutPnfReady); + } + + private void informAboutPnfReady(String correlationId) { + pnfCorrelationIdToThreadMap.get(correlationId).run(); + pnfCorrelationIdToThreadMap.remove(correlationId); + + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + } + public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -93,4 +162,12 @@ public class PnfEventReadyConsumer { this.consumerGroup = consumerGroup; } + public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { + this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; + } + + public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { + this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; + } + } -- cgit 1.2.3-korg