aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion/src/test
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-11-11 19:35:39 -0500
committerBrinda Santh <bs2796@att.com>2019-11-13 15:18:32 -0500
commitada372a56d25b41e5a0926a18bf911d20102810f (patch)
treedac9be4e51fc2e74aefde4e1cf4222267539eee8 /ms/blueprintsprocessor/functions/message-prioritizaion/src/test
parenta0407eb91a2424f847e188796328871b3a339c93 (diff)
Add message prioritization module
Kafka streams based solution for message prioritization using database store. Implement initial Abstract Processors, Puntuations and sample Topology for easy plug and play based on situations Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I9c135604574cc3c642186545e076d6a7c60048d4
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src/test')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt175
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt57
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt98
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml42
4 files changed, 372 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
new file mode 100644
index 000000000..bd99f72d0
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import io.mockk.coEvery
+import io.mockk.every
+import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.context.ApplicationContext
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DataJpaTest
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
+@TestPropertySource(properties =
+[
+ "spring.jpa.show-sql=true",
+ "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+])
+open class MessagePrioritizationConsumerTest {
+
+ @Autowired
+ lateinit var applicationContext: ApplicationContext
+
+ @Autowired
+ lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Before
+ fun setup() {
+ BluePrintDependencyService.inject(applicationContext)
+ }
+
+ @Test
+ fun testBluePrintKafkaJDBCKeyStore() {
+ runBlocking {
+ assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
+
+ val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
+ .instance(MessagePrioritizationStateService::class)
+ assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
+
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
+ val message = messagePrioritizationService.saveMessage(it)
+ val repoResult = messagePrioritizationService.getMessage(message.id)
+ assertNotNull(repoResult, "failed to get inserted message.")
+ }
+ }
+ }
+
+ @Test
+ fun testStartConsuming() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+
+ val streamingConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(configuration.inputTopicSelector)
+ assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyStreamingConsumerService = spyk(streamingConsumerService)
+ coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
+ coEvery { spyStreamingConsumerService.shutDown() } returns Unit
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
+
+
+ // Test Topology
+ val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
+ assertNotNull(topology, "failed to get create topology")
+
+ every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
+ spyMessagePrioritizationConsumer.startConsuming(configuration)
+ spyMessagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testMessagePrioritizationConsumer() {
+ runBlocking {
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ /** Send sample message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+ }
+ delay(10000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
new file mode 100644
index 000000000..4e3eb191b
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.PrimaryDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
+import org.springframework.stereotype.Service
+import javax.sql.DataSource
+
+@Configuration
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
+@EnableAutoConfiguration
+open class TestDatabaseConfiguration {
+
+ @Bean("primaryDBLibGenericService")
+ open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
+ return PrimaryDBLibGenericService(NamedParameterJdbcTemplate(dataSource))
+ }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ else -> null
+ }
+ }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+
+@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+open class DefaultMessageOutputProcessor : MessageOutputProcessor() \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
new file mode 100644
index 000000000..b470db909
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import kotlin.test.assertTrue
+
+class MessageCorrelationUtilsTest {
+
+ @Test
+ fun testCorrelationKeysReordered() {
+
+ val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2")
+ val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key2=value2,key1=value1")
+
+ val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
+ multipleMessages.add(message1)
+ multipleMessages.add(message2)
+ val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
+ assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
+ }
+
+ @Test
+ fun differentTypesWithSameCorrelationMessages() {
+ /** With Types **/
+ /* Assumption is Same group with different types */
+ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+ val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2"))
+ assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResponse")
+
+ /* Assumption is Same group with different types and one missing expected types,
+ In this case type-3 message is missing */
+ val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3"))
+ assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType")
+ }
+
+ @Test
+ fun withSameCorrelationMessagesWithIgnoredTypes() {
+ /** With ignoring Types */
+ /** Assumption is only one message received */
+ val withSameCorrelationOneMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+ val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationOneMessages, null)
+ assertTrue(!withSameCorrelationOneMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp")
+
+ /** Assumption is two message received for same group with same correlation */
+ val withSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+ val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationMessages, null)
+ assertTrue(withSameCorrelationMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp")
+ }
+
+ @Test
+ fun differentTypesWithDifferentCorrelationMessage() {
+ /** Assumption is two message received for same group with different expected types and different correlation */
+ val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2")
+ val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-1", "key1=value1,key2=value3")
+ val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
+ differentTypesWithDifferentCorrelationMessage.add(message1)
+ differentTypesWithDifferentCorrelationMessage.add(message2)
+ val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithDifferentCorrelationMessage,
+ arrayListOf("type-0", "type-1"))
+ assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated,
+ "failed to correlate differentTypesWithDifferentCorrelationMessageResp")
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e3a1f7a01
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>${localPattern}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>