From 0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 7 Jan 2020 14:56:53 -0500 Subject: Include correlationId in group lock. Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked. Optimized Atomix Junit Test cases. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1 --- .../message/prioritization/MessagePrioritizeExtensions.kt | 14 +++++++++----- .../message/prioritization/utils/MessageProcessorUtils.kt | 9 +++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 39d081455..bef7a7b61 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -36,14 +36,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() = /** * MessagePrioritization correlation extensions */ + +/** + * Arrange comma separated correlation keys in ascending order. + */ fun MessagePrioritization.toFormatedCorrelation(): String { - val ascendingKey = this.correlationId!!.split(",") + return this.correlationId!!.split(",") .map { it.trim() }.sorted().joinToString(",") - return ascendingKey } +/** + * Used to group the correlation with respect to types. + */ fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { - val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") - return TypeCorrelationKey(this.type, ascendingKey) + return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index d1f38f4f2..49230b6e4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -22,6 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -32,8 +33,12 @@ object MessageProcessorUtils { clusterService: BluePrintClusterService?, messagePrioritization: MessagePrioritization ): ClusterLock? { - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritization-${messagePrioritization.group}" + return if (clusterService != null && clusterService.clusterJoined() && + !messagePrioritization.correlationId.isNullOrBlank() + ) { + // Get the correlation key in ascending order, even it it is misplaced + val correlationId = messagePrioritization.toFormatedCorrelation() + val lockName = "prioritization-${messagePrioritization.group}-$correlationId" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") -- cgit 1.2.3-korg