summaryrefslogtreecommitdiffstats
path: root/bpmn/so-bpmn-infrastructure-common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'bpmn/so-bpmn-infrastructure-common/src/main')
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java23
1 files changed, 13 insertions, 10 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 353b4e32c5..70323b726c 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -26,8 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.UriBuilder;
@@ -49,10 +48,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA, PnfEventReadyDmaapClient.class);
- private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId";
-
- @Autowired
- private Environment env;
+ private final Environment env;
private HttpClient httpClient;
private String dmaapHost;
private int dmaapPort;
@@ -63,10 +59,15 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private String consumerGroup;
private Map<String, Runnable> pnfCorrelationIdToThreadMap;
private HttpGet getRequest;
- private ScheduledExecutorService executor;
private int dmaapClientDelayInSeconds;
+ private volatile ScheduledThreadPoolExecutor executor;
private volatile boolean dmaapThreadListenerIsRunning;
+ @Autowired
+ public PnfEventReadyDmaapClient(Environment env) {
+ this.env = env;
+ }
+
public void init() {
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
@@ -97,7 +98,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private synchronized void startDmaapThreadListener() {
if (!dmaapThreadListenerIsRunning) {
- executor = Executors.newScheduledThreadPool(1);
+ executor = new ScheduledThreadPoolExecutor(1);
+ executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
dmaapClientDelayInSeconds, TimeUnit.SECONDS);
dmaapThreadListenerIsRunning = true;
@@ -106,7 +109,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private synchronized void stopDmaapThreadListener() {
if (dmaapThreadListenerIsRunning) {
- executor.shutdownNow();
+ executor.shutdown();
dmaapThreadListenerIsRunning = false;
executor = null;
}
@@ -166,7 +169,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
return Collections.emptyList();
}
- private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
Runnable runnable = unregister(correlationId);
if (runnable != null) {
LOGGER.debug("pnf ready event got from dmaap for correlationId: " + correlationId);