diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritization/src/main')
10 files changed, 37 insertions, 37 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 77adb0a87..d8e71d413 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -18,19 +18,19 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction -import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** * Register the MessagePrioritizationStateService and exposed dependency */ -fun BlueprintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = +fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = instance(MessagePrioritizationStateService::class) /** * Expose messagePrioritizationStateService to AbstractComponentFunction */ fun AbstractComponentFunction.messagePrioritizationStateService() = - BlueprintDependencyService.messagePrioritizationStateService() + BluePrintDependencyService.messagePrioritizationStateService() /** * MessagePrioritization correlation extensions diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 341a2aa8d..d4f8470c8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -17,10 +17,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBlueprintMessageProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ -abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBlueprintMessageProcessor<K, V>() { +abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { override fun init(processorContext: ProcessorContext) { this.processorContext = processorContext diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index 6c1f7478c..1b0612492 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -23,7 +23,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -37,7 +37,7 @@ open class DefaultMessagePrioritizeProcessor( override suspend fun processNB(key: ByteArray, value: ByteArray) { val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BlueprintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") try { kafkaMessagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { @@ -62,7 +62,7 @@ open class DefaultMessagePrioritizeProcessor( if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) } else { - throw BlueprintProcessorException( + throw BluePrintProcessorException( "messagePrioritizationService is not instance of " + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" ) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt index ecdcbff08..4ab399f54 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -24,15 +24,15 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.P import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class KafkaMessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService, + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, private val kafkaMessagePrioritizationService: MessagePrioritizationService ) { @@ -50,7 +50,7 @@ open class KafkaMessagePrioritizationConsumer( return object : KafkaStreamConsumerFunction { val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BlueprintProcessorException("failed to get kafka consumer configuration") + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") override suspend fun createTopology( messageConsumerProperties: MessageConsumerProperties, @@ -91,7 +91,7 @@ open class KafkaMessagePrioritizationConsumer( suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BlueprintProcessorException("failed to get kafka consumer configuration") + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt index 4b1a3f375..5595863d4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt @@ -20,7 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.serialization.Serializer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import java.nio.charset.Charset @@ -37,7 +37,7 @@ open class MessagePrioritizationSerde : Serde<MessagePrioritization> { return object : Deserializer<MessagePrioritization> { override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BlueprintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") } override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt index babddbde4..502a7822d 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BlueprintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType @@ -34,7 +34,7 @@ abstract class AbstractNatsMessagePrioritizationService( private val log = logger(AbstractNatsMessagePrioritizationService::class) - lateinit var bluePrintNatsService: BlueprintNatsService + lateinit var bluePrintNatsService: BluePrintNatsService override suspend fun output(messages: List<MessagePrioritization>) { log.info("$$$$$ received in output processor id(${messages.ids()})") diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt index 0976127d1..a0b2cf462 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -22,29 +22,29 @@ import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BlueprintNatsLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BlueprintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.asType import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils open class NatsMessagePrioritizationConsumer( - private val bluePrintNatsLibPropertyService: BlueprintNatsLibPropertyService, + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, private val natsMessagePrioritizationService: MessagePrioritizationService ) { private val log = logger(NatsMessagePrioritizationConsumer::class) - lateinit var bluePrintNatsService: BlueprintNatsService + lateinit var bluePrintNatsService: BluePrintNatsService private lateinit var subscription: Subscription suspend fun startConsuming() { val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() val natsConfiguration = prioritizationConfiguration.natsConfiguration - ?: throw BlueprintProcessorException("couldn't get NATS consumer configuration") + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." @@ -73,7 +73,7 @@ open class NatsMessagePrioritizationConsumer( log.info("Nats prioritization consumer listener shutdown complete") } - private fun consumerService(selector: String): BlueprintNatsService { + private fun consumerService(selector: String): BluePrintNatsService { return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) } diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt index b66a2bd92..ed16fd44f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -21,7 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate @@ -48,7 +48,7 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getMessage(id: String): MessagePrioritization { return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BlueprintProcessorException("couldn't find message for id($id)") + ?: throw BluePrintProcessorException("couldn't find message for id($id)") } override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? { diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt index f42be52af..7ab0be098 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -20,7 +20,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.C import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException object MessageCorrelationUtils { @@ -50,7 +50,7 @@ object MessageCorrelationUtils { val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } if (!unknownMessageTypes.isNullOrEmpty()) { - throw BlueprintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") } val copyTypes = types.toTypedArray().copyOf().toMutableList() diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 3eba4ed86..86cec3697 100644 --- a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -22,14 +22,14 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService object MessageProcessorUtils { /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { - val clusterService = BlueprintDependencyService.optionalClusterService() + val clusterService = BluePrintDependencyService.optionalClusterService() return if (clusterService != null && clusterService.clusterJoined() && !messagePrioritization.correlationId.isNullOrBlank() @@ -39,31 +39,31 @@ object MessageProcessorUtils { val lockName = "prioritize::${messagePrioritization.group}::$correlationId" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() - if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)") + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") clusterLock } else null } /** Utility to create the cluster lock for expiry scheduler*/ suspend fun prioritizationExpiryLock(): ClusterLock? { - val clusterService = BlueprintDependencyService.optionalClusterService() + val clusterService = BluePrintDependencyService.optionalClusterService() return if (clusterService != null && clusterService.clusterJoined()) { val lockName = "prioritize-expiry" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() - if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)") + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") clusterLock } else null } /** Utility to create the cluster lock for expiry scheduler*/ suspend fun prioritizationCleanLock(): ClusterLock? { - val clusterService = BlueprintDependencyService.optionalClusterService() + val clusterService = BluePrintDependencyService.optionalClusterService() return if (clusterService != null && clusterService.clusterJoined()) { val lockName = "prioritize-clean" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() - if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)") + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") clusterLock } else null } @@ -80,7 +80,7 @@ object MessageProcessorUtils { fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor - BlueprintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) + BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) } } } |