aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2020-09-22 19:05:47 +0000
committerGerrit Code Review <gerrit@onap.org>2020-09-22 19:05:47 +0000
commite515a85125e5487ecd3f7127b6af0b192d4a0993 (patch)
tree1fe08a7d54c28419a44845634037d77f8c434a4b /ms/blueprintsprocessor/functions/message-prioritizaion
parent85d8f7f7701570fb745c3d6360d500f97bdf1b25 (diff)
parent1072867dfac0df993cbd3e44bcc11a5cac7465fd (diff)
Merge "Enabling Code Formatter"
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt74
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt1
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt5
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt11
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt30
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt54
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt68
8 files changed, 132 insertions, 119 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 424929b82..65b7644a8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import java.io.Serializable
object MessageActionConstants {
+
const val PRIORITIZE = "prioritize"
}
@@ -33,6 +34,7 @@ enum class MessageState(val id: String) {
}
open class PrioritizationConfiguration : Serializable {
+
lateinit var expiryConfiguration: ExpiryConfiguration
lateinit var shutDownConfiguration: ShutDownConfiguration
lateinit var cleanConfiguration: CleanConfiguration
@@ -41,12 +43,14 @@ open class PrioritizationConfiguration : Serializable {
}
open class KafkaConfiguration : Serializable {
+
lateinit var inputTopicSelector: String // Consumer Configuration Selector
lateinit var expiredTopic: String // Publish Configuration Selector
lateinit var outputTopic: String // Publish Configuration Selector
}
open class NatsConfiguration : Serializable {
+
lateinit var connectionSelector: String // Consumer Configuration Selector
lateinit var inputSubject: String // Publish Configuration Selector
lateinit var expiredSubject: String // Publish Configuration Selector
@@ -54,20 +58,24 @@ open class NatsConfiguration : Serializable {
}
open class ExpiryConfiguration : Serializable {
+
var frequencyMilli: Long = 30000L
var maxPollRecord: Int = 1000
}
open class ShutDownConfiguration : Serializable {
+
var waitMill: Long = 30000L
}
open class CleanConfiguration : Serializable {
+
var frequencyMilli: Long = 30000L
var expiredRecordsHoldDays: Int = 5
}
open class UpdateStateRequest : Serializable {
+
lateinit var id: String
var group: String? = null
var state: String? = null
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
index d5ec0233a..4ab399f54 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
@@ -47,46 +47,46 @@ open class KafkaMessagePrioritizationConsumer(
open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
KafkaStreamConsumerFunction {
- return object : KafkaStreamConsumerFunction {
-
- val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
- ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
-
- override suspend fun createTopology(
- messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?
- ): Topology {
-
- val topology = Topology()
- val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
-
- val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
- log.info("Consuming prioritization topics($topics)")
-
- topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- bluePrintProcessorSupplier<ByteArray, ByteArray>(
+ return object : KafkaStreamConsumerFunction {
+
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+ override suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology {
+
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
+ log.info("Consuming prioritization topics($topics)")
+
+ topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
+
+ topology.addProcessor(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ bluePrintProcessorSupplier<ByteArray, ByteArray>(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
+ ),
+ MessagePrioritizationConstants.SOURCE_INPUT
+ )
+
+ /** To receive completed and error messages */
+ topology.addSink(
+ MessagePrioritizationConstants.SINK_OUTPUT,
+ kafkaConsumerConfiguration.outputTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- ),
- MessagePrioritizationConstants.SOURCE_INPUT
- )
-
- /** To receive completed and error messages */
- topology.addSink(
- MessagePrioritizationConstants.SINK_OUTPUT,
- kafkaConsumerConfiguration.outputTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- // Output will be sent to the group-output topic from Processor API
- return topology
+ )
+
+ // Output will be sent to the group-output topic from Processor API
+ return topology
+ }
}
}
- }
suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
index 20da2c28c..a0b2cf462 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
@@ -35,6 +35,7 @@ open class NatsMessagePrioritizationConsumer(
private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
private val natsMessagePrioritizationService: MessagePrioritizationService
) {
+
private val log = logger(NatsMessagePrioritizationConsumer::class)
lateinit var bluePrintNatsService: BluePrintNatsService
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index a6963d83f..f4602a810 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -165,7 +165,7 @@ abstract class AbstractMessagePrioritizationService(
if (!messages.isNullOrEmpty()) {
try {
/** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
handleAggregation(messages)
} catch (e: Exception) {
val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
@@ -180,7 +180,8 @@ abstract class AbstractMessagePrioritizationService(
} catch (sendException: Exception) {
log.error(
"failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}", e
+ "${sendException.message}",
+ e
)
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
index aaefcc773..529d773a4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -30,17 +30,18 @@ import org.springframework.stereotype.Service
open class MessagePrioritizationSchedulerService(
private val messagePrioritizationService: MessagePrioritizationService
) {
+
private val log = logger(MessagePrioritizationSchedulerService::class)
@Volatile
var keepGoing = true
/** This is sample scheduler implementation used during starting application with configuration.
- @EventListener(ApplicationReadyEvent::class)
- open fun init() = runBlocking {
- log.info("Starting PrioritizationListeners...")
- startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
- }
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
*/
open suspend fun startScheduling() {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
index dde8d95e0..ed16fd44f 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
@@ -65,19 +65,19 @@ open class MessagePrioritizationStateServiceImpl(
override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
- group,
- states, Date(), PageRequest.of(0, count)
- )
- }
+ return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
- group,
- states, Date(), PageRequest.of(0, count)
- )
- }
+ return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
return prioritizationMessageRepository.findByExpiredDate(
@@ -87,11 +87,11 @@ open class MessagePrioritizationStateServiceImpl(
override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndExpiredDate(
- group,
- expiryDate, PageRequest.of(0, count)
- )
- }
+ return prioritizationMessageRepository.findByGroupAndExpiredDate(
+ group,
+ expiryDate, PageRequest.of(0, count)
+ )
+ }
override suspend fun getCorrelatedMessages(
group: String,
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
index fb35df75b..7ab0be098 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
@@ -44,39 +44,39 @@ object MessageCorrelationUtils {
/** Assumption is message is of same group and checking for required types **/
fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?):
- CorrelationCheckResponse {
+ CorrelationCheckResponse {
- return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
+ return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
- val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
- if (!unknownMessageTypes.isNullOrEmpty()) {
- throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
- }
+ val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
+ if (!unknownMessageTypes.isNullOrEmpty()) {
+ throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
+ }
- val copyTypes = types.toTypedArray().copyOf().toMutableList()
+ val copyTypes = types.toTypedArray().copyOf().toMutableList()
- val filteredMessage = collectedMessages.filter {
- !it.correlationId.isNullOrBlank() &&
+ val filteredMessage = collectedMessages.filter {
+ !it.correlationId.isNullOrBlank() &&
types.contains(it.type)
- }
- var correlatedKeys: MutableSet<String> = mutableSetOf()
- if (filteredMessage.isNotEmpty()) {
- val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
- val foundType = correlatedMap.keys.map { it.type }
- copyTypes.removeAll(foundType)
- correlatedKeys = correlatedMap.keys.map {
- it.correlationId
- }.toMutableSet()
- }
- /** Check if any Types missing and same correlation id for all types */
- return if (copyTypes.isEmpty()) {
- if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
- else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
+ }
+ var correlatedKeys: MutableSet<String> = mutableSetOf()
+ if (filteredMessage.isNotEmpty()) {
+ val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
+ val foundType = correlatedMap.keys.map { it.type }
+ copyTypes.removeAll(foundType)
+ correlatedKeys = correlatedMap.keys.map {
+ it.correlationId
+ }.toMutableSet()
+ }
+ /** Check if any Types missing and same correlation id for all types */
+ return if (copyTypes.isEmpty()) {
+ if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
+ else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
+ } else {
+ CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
+ }
} else {
- CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
+ return correlatedMessages(collectedMessages)
}
- } else {
- return correlatedMessages(collectedMessages)
}
- }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index af8d902cd..286a9b5c1 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -61,43 +61,45 @@ import kotlin.test.assertNotNull
@DataJpaTest
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
+ classes = [
+ BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
- MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
+ ]
)
@TestPropertySource(
properties =
- [
- "spring.jpa.show-sql=false",
- "spring.jpa.properties.hibernate.show_sql=false",
- "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
-
- "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
- "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
- "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
- "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
- "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
- "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
- "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
-
- // To send initial test message
- "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
- "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
- "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
- "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
- "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
-
- "blueprintsprocessor.nats.cds-controller.type=token-auth",
- "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
- "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
- ]
+ [
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
+ ]
)
open class MessagePrioritizationConsumerTest {