From 4f4e2de08d3c6259da2497950a96d549d3e82f8a Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 2 Jan 2020 11:59:29 -0500 Subject: Message Prioritization message group lock. Implementation to avoid concurrent procession of message group while prioritization. Sample message prioritization Kafka listener properties. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb --- .../atomix/service/AtomixBluePrintClusterService.kt | 7 +++++++ .../blueprintsprocessor/core/service/BluePrintClusterService.kt | 1 + 2 files changed, 8 insertions(+) (limited to 'ms/blueprintsprocessor/modules/commons') diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 0690eb89d..214a14310 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -148,12 +148,18 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + val log = logger(ClusterLockImpl::class) lateinit var distributedLock: DistributedLock + override fun name(): String { + return distributedLock.name() + } + override suspend fun lock() { distributedLock = AtomixLibUtils.distributedLock(atomix, name) distributedLock.lock() + log.debug("Cluster lock($name) created..") } override suspend fun tryLock(timeout: Long): Boolean { @@ -163,6 +169,7 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String) override suspend fun unLock() { distributedLock.unlock() + log.debug("Cluster unlock(${name()}) successfully..") } override fun isLocked(): Boolean { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 21fcfc509..f994628a2 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -57,6 +57,7 @@ data class ClusterInfo( data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) interface ClusterLock { + fun name(): String suspend fun lock() suspend fun tryLock(timeout: Long): Boolean suspend fun unLock() -- cgit 1.2.3-korg