diff options
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org')
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java | 2 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java) | 2 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java | 49 |
3 files changed, 24 insertions, 29 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java index 5a8d741a05..edff36fe68 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java @@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient; import org.springframework.beans.factory.annotation.Autowired; public class InformDmaapClient implements JavaDelegate { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java index 07e8ada21e..c6b6be6842 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.mso.bpmn.infrastructure.pnf.implementation; +package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; public interface DmaapClient { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index 6871665ba1..8c9903e87a 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -20,6 +20,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements Runnable, DmaapClient { +public class PnfEventReadyConsumer implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { private Map<String, Runnable> pnfCorrelationIdToThreadMap; private HttpGet getRequest; private ScheduledExecutorService executor; - private int dmaapClientInitialDelayInSeconds; private int dmaapClientDelayInSeconds; - private boolean dmaapThreadListenerIsRunning; + private volatile boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); @@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { getRequest = new HttpGet(buildURI()); } - @Override - public void run() { + //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting + @VisibleForTesting + void sendRequest() { try { HttpResponse response = httpClient.execute(getRequest); getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); @@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } @Override - public void registerForUpdate(String correlationId, Runnable informConsumer) { + public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); } } - private void startDmaapThreadListener() { - executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, - dmaapClientDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + private synchronized void startDmaapThreadListener() { + if (!dmaapThreadListenerIsRunning) { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this::sendRequest, 0, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; + } } - private void stopDmaapThreadListener() { + private synchronized void stopDmaapThreadListener() { if (dmaapThreadListenerIsRunning) { executor.shutdownNow(); dmaapThreadListenerIsRunning = false; @@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } - private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() - .ifPresent(this::informAboutPnfReady); - } - - private void informAboutPnfReady(String correlationId) { - pnfCorrelationIdToThreadMap.get(correlationId).run(); - pnfCorrelationIdToThreadMap.remove(correlationId); + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + if (runnable != null) { + runnable.run(); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - stopDmaapThreadListener(); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } } } @@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { this.consumerGroup = consumerGroup; } - public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { - this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; - } - public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } |