summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSeshu Kumar M <seshu.kumar.m@huawei.com>2018-04-19 09:46:07 +0000
committerGerrit Code Review <gerrit@onap.org>2018-04-19 09:46:07 +0000
commite75e31d0d63c3ef9f63f1d81edcef0e47ad8123a (patch)
tree9055940767068bef28906e756a65e1aa451ca031
parent7f954da4321865a7bb3d69e7a29687f8d9d1dff9 (diff)
parent191f0bb93a91538a8c46b38a60935ead70dcb8a9 (diff)
Merge "Synchronization fix for dmaap client"
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java2
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java)2
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java49
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml1
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java2
-rw-r--r--bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java7
6 files changed, 28 insertions, 35 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
index 5a8d741a05..edff36fe68 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
@@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient;
import org.springframework.beans.factory.annotation.Autowired;
public class InformDmaapClient implements JavaDelegate {
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
index 07e8ada21e..c6b6be6842 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.openecomp.mso.bpmn.infrastructure.pnf.implementation;
+package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
public interface DmaapClient {
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
index 6871665ba1..8c9903e87a 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
@@ -20,6 +20,7 @@
package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements Runnable, DmaapClient {
+public class PnfEventReadyConsumer implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
@@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private HttpGet getRequest;
private ScheduledExecutorService executor;
- private int dmaapClientInitialDelayInSeconds;
private int dmaapClientDelayInSeconds;
- private boolean dmaapThreadListenerIsRunning;
+ private volatile boolean dmaapThreadListenerIsRunning;
public PnfEventReadyConsumer() {
httpClient = HttpClientBuilder.create().build();
@@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
getRequest = new HttpGet(buildURI());
}
- @Override
- public void run() {
+ //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
+ @VisibleForTesting
+ void sendRequest() {
try {
HttpResponse response = httpClient.execute(getRequest);
getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
@@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
}
@Override
- public void registerForUpdate(String correlationId, Runnable informConsumer) {
+ public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
if (!dmaapThreadListenerIsRunning) {
startDmaapThreadListener();
}
}
- private void startDmaapThreadListener() {
- executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds,
- dmaapClientDelayInSeconds, TimeUnit.SECONDS);
- dmaapThreadListenerIsRunning = true;
+ private synchronized void startDmaapThreadListener() {
+ if (!dmaapThreadListenerIsRunning) {
+ executor = Executors.newScheduledThreadPool(1);
+ executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+ dmaapThreadListenerIsRunning = true;
+ }
}
- private void stopDmaapThreadListener() {
+ private synchronized void stopDmaapThreadListener() {
if (dmaapThreadListenerIsRunning) {
executor.shutdownNow();
dmaapThreadListenerIsRunning = false;
@@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
}
- private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny()
- .ifPresent(this::informAboutPnfReady);
- }
-
- private void informAboutPnfReady(String correlationId) {
- pnfCorrelationIdToThreadMap.get(correlationId).run();
- pnfCorrelationIdToThreadMap.remove(correlationId);
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+ if (runnable != null) {
+ runnable.run();
- if (pnfCorrelationIdToThreadMap.isEmpty()) {
- stopDmaapThreadListener();
+ if (pnfCorrelationIdToThreadMap.isEmpty()) {
+ stopDmaapThreadListener();
+ }
}
}
@@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
this.consumerGroup = consumerGroup;
}
- public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) {
- this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds;
- }
-
public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
index cbb8266bf7..03fff4f974 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
+++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
@@ -27,7 +27,6 @@
<property name="dmaapTopicName" value="${eventReadyTopicName}"/>
<property name="consumerGroup" value="${consumerGroup}"/>
<property name="consumerId" value="${consumerId}"/>
- <property name="dmaapClientInitialDelayInSeconds" value="${clientThreadInitialDelayInSeconds}"/>
<property name="dmaapClientDelayInSeconds" value="${clientThreadDelayInSeconds}"/>
</bean>
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
index 1103597157..55dd3a968f 100644
--- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
+++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
@@ -20,7 +20,7 @@
package org.openecomp.mso.bpmn.infrastructure.pnf.delegate;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient;
public class DmaapClientTestImpl implements DmaapClient {
private String correlationId;
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
index ef8fa3dd1e..73b8247ebc 100644
--- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
+++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
@@ -76,7 +76,6 @@ public class PnfEventReadyConsumerTest {
testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
testedObject.setConsumerId(CONSUMER_ID);
testedObject.setConsumerGroup(CONSUMER_GROUP);
- testedObject.setDmaapClientInitialDelayInSeconds(1);
testedObject.setDmaapClientDelayInSeconds(1);
testedObject.init();
httpClientMock = mock(HttpClient.class);
@@ -97,7 +96,7 @@ public class PnfEventReadyConsumerTest {
throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
- testedObject.run();
+ testedObject.sendRequest();
ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
verify(httpClientMock).execute(captor1.capture());
assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
@@ -120,7 +119,7 @@ public class PnfEventReadyConsumerTest {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(
String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
- testedObject.run();
+ testedObject.sendRequest();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
@@ -134,7 +133,7 @@ public class PnfEventReadyConsumerTest {
public void correlationIdIsNotFoundInHttpResponse() throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
- testedObject.run();
+ testedObject.sendRequest();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}