diff options
author | prathameshmo <prathamesh.morde@bell.ca> | 2019-06-11 17:47:42 -0400 |
---|---|---|
committer | prathameshmo <prathamesh.morde@bell.ca> | 2019-06-25 17:03:44 -0400 |
commit | b134a815fbb3404e46551f8f2361e6cca6c7728d (patch) | |
tree | 1c2df5a0b010146dc5310b2e64f884fda1eea833 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin | |
parent | a6fae85764a8dfbeba6000a060b8be0f21fb0466 (diff) |
Kafka Messaging Controller API.
Things done-
Addressed review comments.
Logic to consume events and process it.
Added integration testing.
Change-Id: If574a363f9fb8581018cc5a7ba106251a9d8caf1
Issue-ID:CCSDK-1356
Signed-off-by: prathamesh morde <prathamesh.morde@bell.ca>
Signed-off-by: prathameshmo <prathamesh.morde@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin')
5 files changed, 271 insertions, 3 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt index fd764d78f..e084c60cf 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt @@ -23,6 +23,8 @@ import io.grpc.testing.GrpcServerRule import org.junit.Rule import org.junit.Test import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile @@ -33,6 +35,7 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.FileChunk import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -44,7 +47,9 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @EnableAutoConfiguration @DirtiesContext -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class, + MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)]) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintManagementGRPCHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt index f8b972e64..5072b3c6a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -45,7 +46,8 @@ import kotlin.test.BeforeTest @RunWith(SpringRunner::class) @DirtiesContext @EnableAutoConfiguration -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintProcessingGRPCHandlerTest { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 9cbd898dc..65b41262b 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.security.SecurityProperties import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.core.io.ByteArrayResource import org.springframework.http.client.MultipartBodyBuilder import org.springframework.test.context.ContextConfiguration @@ -49,7 +50,8 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @WebFluxTest @ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class]) -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) @TestPropertySource(locations = ["classpath:application-test.properties"]) class ExecutionServiceHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt new file mode 100644 index 000000000..f7459f522 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt @@ -0,0 +1,211 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib + +import com.fasterxml.jackson.databind.node.ObjectNode +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.runBlocking +import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController +import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.autoconfigure.security.SecurityProperties +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.core.io.ByteArrayResource +import org.springframework.http.client.MultipartBodyBuilder +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.annotation.PartitionOffset +import org.springframework.kafka.annotation.TopicPartition +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import org.springframework.test.web.reactive.server.WebTestClient +import org.springframework.test.web.reactive.server.returnResult +import org.springframework.web.reactive.function.BodyInserters +import java.io.File +import java.nio.file.Files +import java.nio.file.Paths +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@EnableAutoConfiguration +@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@TestPropertySource(locations = ["classpath:application-test.properties"]) +@DirtiesContext +@EmbeddedKafka(ports = [9092]) +@WebFluxTest +class MessagingControllerTest { + + private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!! + + @Autowired + lateinit var controller: MessagingController + + @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}") + lateinit var topicUsedForConsumer: String + + @Autowired + lateinit var kt: KafkaTemplate<String, ExecutionServiceInput> + + @Autowired + lateinit var webTestClient: WebTestClient + + var receivedEvent: String? = null + + @Before + fun setup() { + deleteDir("target", "blueprints") + uploadBluePrint() + } + + @After + fun clean() { + deleteDir("target", "blueprints") + } + + @Test + fun testReceive() { + val samplePayload = "{\n" + + " \"resource-assignment-request\": {\n" + + " \"artifact-name\": [\"hostname\"],\n" + + " \"store-result\": true,\n" + + " \"resource-assignment-properties\" : {\n" + + " \"hostname\": \"demo123\"\n" + + " }\n" + + " }\n" + + " }" + + kt.defaultTopic = topicUsedForConsumer + + val input = ExecutionServiceInput().apply { + commonHeader = CommonHeader().apply { + originatorId = "1" + requestId = "1234" + subRequestId = "1234-1234" + } + + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "golden" + blueprintVersion = "1.0.0" + actionName = "resource-assignment" + mode = "sync" + } + + stepData = StepData().apply { + name = "resource-assignment" + } + + payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode + } + + kt.sendDefault(input) + log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input)) + + Thread.sleep(1000) + } + + @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])]) + fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) { + assertNotNull(event) + } + + private fun uploadBluePrint() { + runBlocking { + val body = MultipartBodyBuilder().apply { + part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) { + override fun getFilename(): String { + return "test-cba.zip" + } + }) + }.build() + + webTestClient + .post() + .uri("/api/v1/execution-service/upload") + .body(BodyInserters.fromMultipartData(body)) + .exchange() + .expectStatus().isOk + .returnResult<String>() + .responseBody + .awaitSingle() + } + } + + private fun loadCbaArchive():File { + return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile() + } + + @Configuration + @EnableKafka + open class ConsumerConfiguration { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") + lateinit var groupId:String + + @Bean + open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? { + val configProperties = hashMapOf<String, Any>() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000 + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), + JsonDeserializer(ExecutionServiceInput::class.java)) + } + + @Bean + open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { + val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() + factory.consumerFactory = consumerFactory2() + return factory + } + } +} + + diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt new file mode 100644 index 000000000..dc1f38a63 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt @@ -0,0 +1,48 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +open class ProducerConfiguration { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + open fun kpf(): ProducerFactory<String, ExecutionServiceInput> { + val configs = HashMap<String, Any>() + configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java + return DefaultKafkaProducerFactory(configs) + } + + @Bean + open fun kt(): KafkaTemplate<String, ExecutionServiceInput> { + return KafkaTemplate<String, ExecutionServiceInput>(kpf()) + } +}
\ No newline at end of file |