summaryrefslogtreecommitdiffstats
path: root/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso
diff options
context:
space:
mode:
authorbiniek <lukasz.biniek@nokia.com>2018-04-17 16:08:12 +0200
committerbiniek <lukasz.biniek@nokia.com>2018-04-17 16:08:12 +0200
commit191f0bb93a91538a8c46b38a60935ead70dcb8a9 (patch)
tree693ad682da0d968c579f7f23ad9c9217dfacd5a4 /bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso
parent5304d1554c35486203ca86a353d9059996430ad3 (diff)
Synchronization fix for dmaap client
Change-Id: Ibcad191dc0994c8c4498ebdbc82e4c1f694517bd Issue-ID: SO-506 Signed-off-by: biniek <lukasz.biniek@nokia.com>
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso')
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java2
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java)2
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java49
3 files changed, 24 insertions, 29 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
index 5a8d741a05..edff36fe68 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
@@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient;
import org.springframework.beans.factory.annotation.Autowired;
public class InformDmaapClient implements JavaDelegate {
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
index 07e8ada21e..c6b6be6842 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.mso.bpmn.infrastructure.pnf.implementation;
+package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
public interface DmaapClient {
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
index 6871665ba1..8c9903e87a 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
@@ -20,6 +20,7 @@
package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements Runnable, DmaapClient {
+public class PnfEventReadyConsumer implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
@@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private HttpGet getRequest;
private ScheduledExecutorService executor;
- private int dmaapClientInitialDelayInSeconds;
private int dmaapClientDelayInSeconds;
- private boolean dmaapThreadListenerIsRunning;
+ private volatile boolean dmaapThreadListenerIsRunning;
public PnfEventReadyConsumer() {
httpClient = HttpClientBuilder.create().build();
@@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
getRequest = new HttpGet(buildURI());
}
- @Override
- public void run() {
+ //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
+ @VisibleForTesting
+ void sendRequest() {
try {
HttpResponse response = httpClient.execute(getRequest);
getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
@@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
}
@Override
- public void registerForUpdate(String correlationId, Runnable informConsumer) {
+ public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
if (!dmaapThreadListenerIsRunning) {
startDmaapThreadListener();
}
}
- private void startDmaapThreadListener() {
- executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds,
- dmaapClientDelayInSeconds, TimeUnit.SECONDS);
- dmaapThreadListenerIsRunning = true;
+ private synchronized void startDmaapThreadListener() {
+ if (!dmaapThreadListenerIsRunning) {
+ executor = Executors.newScheduledThreadPool(1);
+ executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+ dmaapThreadListenerIsRunning = true;
+ }
}
- private void stopDmaapThreadListener() {
+ private synchronized void stopDmaapThreadListener() {
if (dmaapThreadListenerIsRunning) {
executor.shutdownNow();
dmaapThreadListenerIsRunning = false;
@@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
}
- private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny()
- .ifPresent(this::informAboutPnfReady);
- }
-
- private void informAboutPnfReady(String correlationId) {
- pnfCorrelationIdToThreadMap.get(correlationId).run();
- pnfCorrelationIdToThreadMap.remove(correlationId);
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+ if (runnable != null) {
+ runnable.run();
- if (pnfCorrelationIdToThreadMap.isEmpty()) {
- stopDmaapThreadListener();
+ if (pnfCorrelationIdToThreadMap.isEmpty()) {
+ stopDmaapThreadListener();
+ }
}
}
@@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
this.consumerGroup = consumerGroup;
}
- public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) {
- this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds;
- }
-
public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}