diff options
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java')
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java) | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index 1fd2de97f2..830574bad4 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -20,7 +20,6 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -38,7 +37,7 @@ import org.apache.http.util.EntityUtils; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements DmaapClient { +public class PnfEventReadyDmaapClient implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -57,7 +56,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private int dmaapClientDelayInSeconds; private volatile boolean dmaapThreadListenerIsRunning; - public PnfEventReadyConsumer() { + public PnfEventReadyDmaapClient() { httpClient = HttpClientBuilder.create().build(); pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); executor = null; @@ -67,17 +66,6 @@ public class PnfEventReadyConsumer implements DmaapClient { getRequest = new HttpGet(buildURI()); } - //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting - @VisibleForTesting - void sendRequest() { - try { - HttpResponse response = httpClient.execute(getRequest); - getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); - } catch (IOException e) { - LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } - } - @Override public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); @@ -98,7 +86,7 @@ public class PnfEventReadyConsumer implements DmaapClient { private synchronized void startDmaapThreadListener() { if (!dmaapThreadListenerIsRunning) { executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this::sendRequest, 0, + executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, dmaapClientDelayInSeconds, TimeUnit.SECONDS); dmaapThreadListenerIsRunning = true; } @@ -120,24 +108,6 @@ public class PnfEventReadyConsumer implements DmaapClient { .path(consumerGroup).path(consumerId).build(); } - private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); - } - } - return Optional.empty(); - } - - - private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - Runnable runnable = unregister(correlationId); - if (runnable != null) { - runnable.run(); - } - } - public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -170,4 +140,34 @@ public class PnfEventReadyConsumer implements DmaapClient { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } + class DmaapTopicListenerThread implements Runnable { + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = unregister(correlationId); + if (runnable != null) { + runnable.run(); + } + } + } + } |