aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritization/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritization/src/test')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt350
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt65
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt132
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml42
4 files changed, 589 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
new file mode 100644
index 000000000..286a9b5c1
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -0,0 +1,350 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import io.mockk.coEvery
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.context.ApplicationContext
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@DataJpaTest
+@DirtiesContext
+@ContextConfiguration(
+ classes = [
+ BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
+ ]
+)
+@TestPropertySource(
+ properties =
+ [
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
+ ]
+)
+open class MessagePrioritizationConsumerTest {
+
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
+ @Autowired
+ lateinit var applicationContext: ApplicationContext
+
+ @Autowired
+ lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Autowired
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
+
+ @Autowired
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+
+ @Before
+ fun setup() {
+ BluePrintDependencyService.inject(applicationContext)
+ }
+
+ @Test
+ fun testBluePrintKafkaJDBCKeyStore() {
+ runBlocking {
+ assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
+
+ val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
+ .instance(MessagePrioritizationStateService::class)
+ assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
+
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
+ val message = messagePrioritizationService.saveMessage(it)
+ val repoResult = messagePrioritizationService.getMessage(message.id)
+ assertNotNull(repoResult, "failed to get inserted message.")
+ }
+ }
+ }
+
+ @Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
+ @Test
+ fun testStartConsuming() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+
+ val streamingConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
+ assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyStreamingConsumerService = spyk(streamingConsumerService)
+ coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
+ coEvery { spyStreamingConsumerService.shutDown() } returns Unit
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
+ )
+ val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
+
+ // Test Topology
+ val kafkaStreamConsumerFunction =
+ spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
+ assertNotNull(topology, "failed to get create topology")
+
+ every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
+ spyMessagePrioritizationConsumer.startConsuming(configuration)
+ spyMessagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ @Test
+ fun testSchedulerService() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
+ launch {
+ messagePrioritizationSchedulerService.startScheduling()
+ }
+ launch {
+ /** To debug increase the delay time */
+ delay(20)
+ messagePrioritizationSchedulerService.shutdownScheduling()
+ }
+ }
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ // @Test
+ fun testKafkaMessagePrioritizationConsumer() {
+ runBlocking {
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
+
+ /** Send sample message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+ }
+ delay(10000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
new file mode 100644
index 000000000..22c399608
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
+import org.springframework.stereotype.Service
+import javax.sql.DataSource
+
+@Configuration
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
+@EnableAutoConfiguration
+open class TestDatabaseConfiguration {
+
+ @Bean("primaryDBLibGenericService")
+ open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
+ return PrimaryDBLibGenericService(
+ NamedParameterJdbcTemplate(dataSource)
+ )
+ }
+}
+
+/* Sample Prioritization Listener, used during Application startup
+@Component
+open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
+
+ private val log = logger(SamplePrioritizationListeners::class)
+
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer
+ .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+
+ @PreDestroy
+ open fun destroy() = runBlocking {
+ log.info("Shutting down PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer.shutDown()
+ }
+}
+ */
+
+@Service
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
new file mode 100644
index 000000000..73d3738e5
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
+import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
+
+class MessageCorrelationUtilsTest {
+
+ @Test
+ fun testCorrelationKeysReordered() {
+
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key2=value2,key1=value1"
+ )
+
+ val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
+ multipleMessages.add(message1)
+ multipleMessages.add(message2)
+ val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
+ assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
+ }
+
+ @Test
+ fun differentTypesWithSameCorrelationMessages() {
+ /** With Types **/
+ /* Assumption is Same group with different types */
+ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+ val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2")
+ )
+ assertTrue(
+ differentTypesWithSameCorrelationMessagesResponse.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResponse"
+ )
+
+ /* Assumption is Same group with different types and one missing expected types,
+ In this case type-3 message is missing */
+ val differentTypesWithSameCorrelationMessagesResWithMissingType =
+ MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3")
+ )
+ assertTrue(
+ !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
+ )
+ }
+
+ @Test
+ fun withSameCorrelationMessagesWithIgnoredTypes() {
+ /** With ignoring Types */
+ /** Assumption is only one message received */
+ val withSameCorrelationOneMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+ val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationOneMessages, null
+ )
+ assertTrue(
+ !withSameCorrelationOneMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
+
+ /** Assumption is two message received for same group with same correlation */
+ val withSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+ val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationMessages, null
+ )
+ assertTrue(
+ withSameCorrelationMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
+ }
+
+ @Test
+ fun differentTypesWithDifferentCorrelationMessage() {
+ /** Assumption is two message received for same group with different expected types and different correlation */
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-1", "key1=value1,key2=value3"
+ )
+ val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
+ differentTypesWithDifferentCorrelationMessage.add(message1)
+ differentTypesWithDifferentCorrelationMessage.add(message2)
+ val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithDifferentCorrelationMessage,
+ arrayListOf("type-0", "type-1")
+ )
+ assertTrue(
+ !differentTypesWithDifferentCorrelationMessageResp.correlated,
+ "failed to correlate differentTypesWithDifferentCorrelationMessageResp"
+ )
+ }
+
+ @Test
+ fun testPrioritizationOrdering() {
+ val differentPriorityMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5)
+ val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority()
+ assertNotNull(orderedPriorityMessages, "failed to order the priority messages")
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e3a1f7a01
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>${localPattern}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>