diff options
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp')
4 files changed, 55 insertions, 7 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java new file mode 100644 index 0000000000..92169b202b --- /dev/null +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscription.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; + +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient; +import org.springframework.beans.factory.annotation.Autowired; + +public class CancelDmaapSubscription implements JavaDelegate { + + private DmaapClient dmaapClient; + + @Override + public void execute(DelegateExecution execution) throws Exception { + String correlationId = (String) execution.getVariable(ExecutionVariableNames.CORRELATION_ID); + dmaapClient.unregister(correlationId); + } + + @Autowired + public void setDmaapClient(DmaapClient dmaapClient) { + this.dmaapClient = dmaapClient; + } +} diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/ExecutionVariableNames.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/ExecutionVariableNames.java index 0d64f2c8b7..94d221a054 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/ExecutionVariableNames.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/ExecutionVariableNames.java @@ -23,8 +23,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate; @SuppressWarnings("ALL") public class ExecutionVariableNames { - private ExecutionVariableNames() { - } + private ExecutionVariableNames() {} public final static String CORRELATION_ID = "correlationId"; public final static String AAI_CONTAINS_INFO_ABOUT_PNF = "aaiContainsInfoAboutPnf"; diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java index c6b6be6842..12a6c7c51f 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java @@ -23,4 +23,6 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; public interface DmaapClient { void registerForUpdate(String correlationId, Runnable informConsumer); + + Runnable unregister(String correlationId); } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index 8c9903e87a..1fd2de97f2 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -86,6 +86,15 @@ public class PnfEventReadyConsumer implements DmaapClient { } } + @Override + public synchronized Runnable unregister(String correlationId) { + Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + return runnable; + } + private synchronized void startDmaapThreadListener() { if (!dmaapThreadListenerIsRunning) { executor = Executors.newScheduledThreadPool(1); @@ -123,13 +132,9 @@ public class PnfEventReadyConsumer implements DmaapClient { private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + Runnable runnable = unregister(correlationId); if (runnable != null) { runnable.run(); - - if (pnfCorrelationIdToThreadMap.isEmpty()) { - stopDmaapThreadListener(); - } } } |