summaryrefslogtreecommitdiffstats
path: root/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp
diff options
context:
space:
mode:
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp')
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java)66
1 files changed, 33 insertions, 33 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 1fd2de97f2..830574bad4 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -20,7 +20,6 @@
package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@@ -38,7 +37,7 @@ import org.apache.http.util.EntityUtils;
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements DmaapClient {
+public class PnfEventReadyDmaapClient implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
@@ -57,7 +56,7 @@ public class PnfEventReadyConsumer implements DmaapClient {
private int dmaapClientDelayInSeconds;
private volatile boolean dmaapThreadListenerIsRunning;
- public PnfEventReadyConsumer() {
+ public PnfEventReadyDmaapClient() {
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
executor = null;
@@ -67,17 +66,6 @@ public class PnfEventReadyConsumer implements DmaapClient {
getRequest = new HttpGet(buildURI());
}
- //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
- @VisibleForTesting
- void sendRequest() {
- try {
- HttpResponse response = httpClient.execute(getRequest);
- getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
- } catch (IOException e) {
- LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
- }
- }
-
@Override
public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
@@ -98,7 +86,7 @@ public class PnfEventReadyConsumer implements DmaapClient {
private synchronized void startDmaapThreadListener() {
if (!dmaapThreadListenerIsRunning) {
executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
dmaapClientDelayInSeconds, TimeUnit.SECONDS);
dmaapThreadListenerIsRunning = true;
}
@@ -120,24 +108,6 @@ public class PnfEventReadyConsumer implements DmaapClient {
.path(consumerGroup).path(consumerId).build();
}
- private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
- if (response.getStatusLine().getStatusCode() == 200) {
- String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
- if (responseString != null) {
- return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
- }
- }
- return Optional.empty();
- }
-
-
- private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- Runnable runnable = unregister(correlationId);
- if (runnable != null) {
- runnable.run();
- }
- }
-
public void setDmaapHost(String dmaapHost) {
this.dmaapHost = dmaapHost;
}
@@ -170,4 +140,34 @@ public class PnfEventReadyConsumer implements DmaapClient {
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}
+ class DmaapTopicListenerThread implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ HttpResponse response = httpClient.execute(getRequest);
+ getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+ } catch (IOException e) {
+ LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+ }
+ }
+
+ private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (responseString != null) {
+ return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = unregister(correlationId);
+ if (runnable != null) {
+ runnable.run();
+ }
+ }
+ }
+
}