diff options
author | Seshu Kumar M <seshu.kumar.m@huawei.com> | 2018-04-19 09:46:07 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-04-19 09:46:07 +0000 |
commit | e75e31d0d63c3ef9f63f1d81edcef0e47ad8123a (patch) | |
tree | 9055940767068bef28906e756a65e1aa451ca031 | |
parent | 7f954da4321865a7bb3d69e7a29687f8d9d1dff9 (diff) | |
parent | 191f0bb93a91538a8c46b38a60935ead70dcb8a9 (diff) |
Merge "Synchronization fix for dmaap client"
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java | 2 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java (renamed from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java) | 2 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java | 49 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml | 1 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java | 2 | ||||
-rw-r--r-- | bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java | 7 |
6 files changed, 28 insertions, 35 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java index 5a8d741a05..edff36fe68 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java @@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient; import org.springframework.beans.factory.annotation.Autowired; public class InformDmaapClient implements JavaDelegate { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java index 07e8ada21e..c6b6be6842 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.mso.bpmn.infrastructure.pnf.implementation; +package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; public interface DmaapClient { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index 6871665ba1..8c9903e87a 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -20,6 +20,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements Runnable, DmaapClient { +public class PnfEventReadyConsumer implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { private Map<String, Runnable> pnfCorrelationIdToThreadMap; private HttpGet getRequest; private ScheduledExecutorService executor; - private int dmaapClientInitialDelayInSeconds; private int dmaapClientDelayInSeconds; - private boolean dmaapThreadListenerIsRunning; + private volatile boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); @@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { getRequest = new HttpGet(buildURI()); } - @Override - public void run() { + //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting + @VisibleForTesting + void sendRequest() { try { HttpResponse response = httpClient.execute(getRequest); getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); @@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } @Override - public void registerForUpdate(String correlationId, Runnable informConsumer) { + public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); } } - private void startDmaapThreadListener() { - executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, - dmaapClientDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + private synchronized void startDmaapThreadListener() { + if (!dmaapThreadListenerIsRunning) { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this::sendRequest, 0, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; + } } - private void stopDmaapThreadListener() { + private synchronized void stopDmaapThreadListener() { if (dmaapThreadListenerIsRunning) { executor.shutdownNow(); dmaapThreadListenerIsRunning = false; @@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } - private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() - .ifPresent(this::informAboutPnfReady); - } - - private void informAboutPnfReady(String correlationId) { - pnfCorrelationIdToThreadMap.get(correlationId).run(); - pnfCorrelationIdToThreadMap.remove(correlationId); + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + if (runnable != null) { + runnable.run(); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - stopDmaapThreadListener(); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } } } @@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { this.consumerGroup = consumerGroup; } - public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { - this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; - } - public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml index cbb8266bf7..03fff4f974 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml +++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml @@ -27,7 +27,6 @@ <property name="dmaapTopicName" value="${eventReadyTopicName}"/>
<property name="consumerGroup" value="${consumerGroup}"/>
<property name="consumerId" value="${consumerId}"/>
- <property name="dmaapClientInitialDelayInSeconds" value="${clientThreadInitialDelayInSeconds}"/>
<property name="dmaapClientDelayInSeconds" value="${clientThreadDelayInSeconds}"/>
</bean>
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java index 1103597157..55dd3a968f 100644 --- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java +++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java @@ -20,7 +20,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient; public class DmaapClientTestImpl implements DmaapClient { private String correlationId; diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java index ef8fa3dd1e..73b8247ebc 100644 --- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java +++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java @@ -76,7 +76,6 @@ public class PnfEventReadyConsumerTest { testedObject.setDmaapTopicName(EVENT_TOPIC_TEST); testedObject.setConsumerId(CONSUMER_ID); testedObject.setConsumerGroup(CONSUMER_GROUP); - testedObject.setDmaapClientInitialDelayInSeconds(1); testedObject.setDmaapClientDelayInSeconds(1); testedObject.init(); httpClientMock = mock(HttpClient.class); @@ -97,7 +96,7 @@ public class PnfEventReadyConsumerTest { throws IOException { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID))); - testedObject.run(); + testedObject.sendRequest(); ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class); verify(httpClientMock).execute(captor1.capture()); assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL) @@ -120,7 +119,7 @@ public class PnfEventReadyConsumerTest { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse( String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP))); - testedObject.run(); + testedObject.sendRequest(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } @@ -134,7 +133,7 @@ public class PnfEventReadyConsumerTest { public void correlationIdIsNotFoundInHttpResponse() throws IOException { when(httpClientMock.execute(any(HttpGet.class))). thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID)); - testedObject.run(); + testedObject.sendRequest(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } |