From 191f0bb93a91538a8c46b38a60935ead70dcb8a9 Mon Sep 17 00:00:00 2001 From: biniek Date: Tue, 17 Apr 2018 16:08:12 +0200 Subject: Synchronization fix for dmaap client Change-Id: Ibcad191dc0994c8c4498ebdbc82e4c1f694517bd Issue-ID: SO-506 Signed-off-by: biniek --- .../pnf/delegate/InformDmaapClient.java | 2 +- .../bpmn/infrastructure/pnf/dmaap/DmaapClient.java | 26 ++++++++++++ .../pnf/dmaap/PnfEventReadyConsumer.java | 49 ++++++++++------------ .../pnf/implementation/DmaapClient.java | 26 ------------ 4 files changed, 49 insertions(+), 54 deletions(-) create mode 100644 bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java delete mode 100644 bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org') diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java index 5a8d741a05..edff36fe68 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java @@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.JavaDelegate; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient; import org.springframework.beans.factory.annotation.Autowired; public class InformDmaapClient implements JavaDelegate { diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java new file mode 100644 index 0000000000..c6b6be6842 --- /dev/null +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; + +public interface DmaapClient { + + void registerForUpdate(String correlationId, Runnable informConsumer); +} diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index 6871665ba1..8c9903e87a 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -20,6 +20,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; import java.util.Map; @@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; -import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; import org.openecomp.mso.jsonpath.JsonPathUtil; import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer implements Runnable, DmaapClient { +public class PnfEventReadyConsumer implements DmaapClient { private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); @@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { private Map pnfCorrelationIdToThreadMap; private HttpGet getRequest; private ScheduledExecutorService executor; - private int dmaapClientInitialDelayInSeconds; private int dmaapClientDelayInSeconds; - private boolean dmaapThreadListenerIsRunning; + private volatile boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); @@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { getRequest = new HttpGet(buildURI()); } - @Override - public void run() { + //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting + @VisibleForTesting + void sendRequest() { try { HttpResponse response = httpClient.execute(getRequest); getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); @@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } @Override - public void registerForUpdate(String correlationId, Runnable informConsumer) { + public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); } } - private void startDmaapThreadListener() { - executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, - dmaapClientDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; + private synchronized void startDmaapThreadListener() { + if (!dmaapThreadListenerIsRunning) { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this::sendRequest, 0, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; + } } - private void stopDmaapThreadListener() { + private synchronized void stopDmaapThreadListener() { if (dmaapThreadListenerIsRunning) { executor.shutdownNow(); dmaapThreadListenerIsRunning = false; @@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { } - private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() - .ifPresent(this::informAboutPnfReady); - } - - private void informAboutPnfReady(String correlationId) { - pnfCorrelationIdToThreadMap.get(correlationId).run(); - pnfCorrelationIdToThreadMap.remove(correlationId); + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + if (runnable != null) { + runnable.run(); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - stopDmaapThreadListener(); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } } } @@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient { this.consumerGroup = consumerGroup; } - public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { - this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; - } - public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java deleted file mode 100644 index 07e8ada21e..0000000000 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java +++ /dev/null @@ -1,26 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.mso.bpmn.infrastructure.pnf.implementation; - -public interface DmaapClient { - - void registerForUpdate(String correlationId, Runnable informConsumer); -} -- cgit 1.2.3-korg