diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
8 files changed, 132 insertions, 119 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 424929b82..65b7644a8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import java.io.Serializable object MessageActionConstants { + const val PRIORITIZE = "prioritize" } @@ -33,6 +34,7 @@ enum class MessageState(val id: String) { } open class PrioritizationConfiguration : Serializable { + lateinit var expiryConfiguration: ExpiryConfiguration lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration @@ -41,12 +43,14 @@ open class PrioritizationConfiguration : Serializable { } open class KafkaConfiguration : Serializable { + lateinit var inputTopicSelector: String // Consumer Configuration Selector lateinit var expiredTopic: String // Publish Configuration Selector lateinit var outputTopic: String // Publish Configuration Selector } open class NatsConfiguration : Serializable { + lateinit var connectionSelector: String // Consumer Configuration Selector lateinit var inputSubject: String // Publish Configuration Selector lateinit var expiredSubject: String // Publish Configuration Selector @@ -54,20 +58,24 @@ open class NatsConfiguration : Serializable { } open class ExpiryConfiguration : Serializable { + var frequencyMilli: Long = 30000L var maxPollRecord: Int = 1000 } open class ShutDownConfiguration : Serializable { + var waitMill: Long = 30000L } open class CleanConfiguration : Serializable { + var frequencyMilli: Long = 30000L var expiredRecordsHoldDays: Int = 5 } open class UpdateStateRequest : Serializable { + lateinit var id: String var group: String? = null var state: String? = null diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt index d5ec0233a..4ab399f54 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -47,46 +47,46 @@ open class KafkaMessagePrioritizationConsumer( open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier<ByteArray, ByteArray>( + return object : KafkaStreamConsumerFunction { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier<ByteArray, ByteArray>( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + kafkaConsumerConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - kafkaConsumerConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - // Output will be sent to the group-output topic from Processor API - return topology + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } } } - } suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt index 20da2c28c..a0b2cf462 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -35,6 +35,7 @@ open class NatsMessagePrioritizationConsumer( private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, private val natsMessagePrioritizationService: MessagePrioritizationService ) { + private val log = logger(NatsMessagePrioritizationConsumer::class) lateinit var bluePrintNatsService: BluePrintNatsService diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index a6963d83f..f4602a810 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -165,7 +165,7 @@ abstract class AbstractMessagePrioritizationService( if (!messages.isNullOrEmpty()) { try { /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ + Populate New Message and Update status with Prioritized, Forward the message to next processor */ handleAggregation(messages) } catch (e: Exception) { val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" @@ -180,7 +180,8 @@ abstract class AbstractMessagePrioritizationService( } catch (sendException: Exception) { log.error( "failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", e + "${sendException.message}", + e ) } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt index aaefcc773..529d773a4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -30,17 +30,18 @@ import org.springframework.stereotype.Service open class MessagePrioritizationSchedulerService( private val messagePrioritizationService: MessagePrioritizationService ) { + private val log = logger(MessagePrioritizationSchedulerService::class) @Volatile var keepGoing = true /** This is sample scheduler implementation used during starting application with configuration. - @EventListener(ApplicationReadyEvent::class) - open fun init() = runBlocking { - log.info("Starting PrioritizationListeners...") - startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) - } + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } */ open suspend fun startScheduling() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt index dde8d95e0..ed16fd44f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -65,19 +65,19 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? { return prioritizationMessageRepository.findByExpiredDate( @@ -87,11 +87,11 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndExpiredDate( - group, - expiryDate, PageRequest.of(0, count) - ) - } + return prioritizationMessageRepository.findByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } override suspend fun getCorrelatedMessages( group: String, diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt index fb35df75b..7ab0be098 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -44,39 +44,39 @@ object MessageCorrelationUtils { /** Assumption is message is of same group and checking for required types **/ fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?): - CorrelationCheckResponse { + CorrelationCheckResponse { - return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { + return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { - val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } - if (!unknownMessageTypes.isNullOrEmpty()) { - throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") - } + val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } + if (!unknownMessageTypes.isNullOrEmpty()) { + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + } - val copyTypes = types.toTypedArray().copyOf().toMutableList() + val copyTypes = types.toTypedArray().copyOf().toMutableList() - val filteredMessage = collectedMessages.filter { - !it.correlationId.isNullOrBlank() && + val filteredMessage = collectedMessages.filter { + !it.correlationId.isNullOrBlank() && types.contains(it.type) - } - var correlatedKeys: MutableSet<String> = mutableSetOf() - if (filteredMessage.isNotEmpty()) { - val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } - val foundType = correlatedMap.keys.map { it.type } - copyTypes.removeAll(foundType) - correlatedKeys = correlatedMap.keys.map { - it.correlationId - }.toMutableSet() - } - /** Check if any Types missing and same correlation id for all types */ - return if (copyTypes.isEmpty()) { - if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) - else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } + var correlatedKeys: MutableSet<String> = mutableSetOf() + if (filteredMessage.isNotEmpty()) { + val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } + val foundType = correlatedMap.keys.map { it.type } + copyTypes.removeAll(foundType) + correlatedKeys = correlatedMap.keys.map { + it.correlationId + }.toMutableSet() + } + /** Check if any Types missing and same correlation id for all types */ + return if (copyTypes.isEmpty()) { + if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) + else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } else { + CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + } } else { - CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + return correlatedMessages(collectedMessages) } - } else { - return correlatedMessages(collectedMessages) } - } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index af8d902cd..286a9b5c1 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -61,43 +61,45 @@ import kotlin.test.assertNotNull @DataJpaTest @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, + classes = [ + BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, - MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class + ] ) @TestPropertySource( properties = - [ - "spring.jpa.show-sql=false", - "spring.jpa.properties.hibernate.show_sql=false", - "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", - - "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", - "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", - "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", - - // To send initial test message - "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", - - "blueprintsprocessor.nats.cds-controller.type=token-auth", - "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", - "blueprintsprocessor.nats.cds-controller.token=tokenAuth" - ] + [ + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" + ] ) open class MessagePrioritizationConsumerTest { |