diff options
author | Lukasz Muszkieta <lukasz.muszkieta@nokia.com> | 2018-04-25 11:54:47 +0200 |
---|---|---|
committer | Lukasz Muszkieta <lukasz.muszkieta@nokia.com> | 2018-04-25 11:58:28 +0200 |
commit | 7c73254c00e5dc598f68e074d160b2d263872cda (patch) | |
tree | 06eff647af12b7215c28e05ab681117ca0348a8b /bpmn/MSOInfrastructureBPMN/src/main/java | |
parent | c7c76f0e6d6aacf6f1b1cd07a42360291fde7e85 (diff) |
PnfReadyEventConsumer implementation
Change-Id: Ic8d5814c555bad436bfcbe1b4e212637a6800947
Issue-ID: SO-466
Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java')
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java) | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index 1fd2de97f2..830574bad4 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -20,7 +20,6 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.http.util.EntityUtils; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements DmaapClient { +public class PnfEventReadyDmaapClient implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -57,7 +56,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private int dmaapClientDelayInSeconds; private volatile boolean dmaapThreadListenerIsRunning; - public PnfEventReadyConsumer() { + public PnfEventReadyDmaapClient() { httpClient = HttpClientBuilder.create().build(); pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); executor = null; @@ -67,17 +66,6 @@ public class PnfEventReadyConsumer implements DmaapClient { getRequest = new HttpGet(buildURI()); } - //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting - @VisibleForTesting - void sendRequest() { - try { - HttpResponse response = httpClient.execute(getRequest); - getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); - } catch (IOException e) { - LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } - } - @Override public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); @@ -98,7 +86,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private synchronized void startDmaapThreadListener() { if (!dmaapThreadListenerIsRunning) { executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this::sendRequest, 0, + executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, dmaapClientDelayInSeconds, TimeUnit.SECONDS); dmaapThreadListenerIsRunning = true; } @@ -120,24 +108,6 @@ public class PnfEventReadyConsumer implements DmaapClient { .path(consumerGroup).path(consumerId).build(); } - private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); - } - } - return Optional.empty(); - } - - - private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - Runnable runnable = unregister(correlationId); - if (runnable != null) { - runnable.run(); - } - } - public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -170,4 +140,34 @@ public class PnfEventReadyConsumer implements DmaapClient { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } + class DmaapTopicListenerThread implements Runnable { + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = unregister(correlationId); + if (runnable != null) { + runnable.run(); + } + } + } + } |