diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api')
11 files changed, 468 insertions, 22 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml index 340f2c618..ece1b0ac2 100755 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml @@ -42,6 +42,7 @@ <artifactId>proto-definition</artifactId> <version>${project.version}</version> </dependency> + <dependency> <groupId>org.onap.ccsdk.cds.controllerblueprints</groupId> <artifactId>blueprint-core</artifactId> @@ -59,6 +60,35 @@ <artifactId>h2</artifactId> <scope>test</scope> </dependency> - </dependencies> + <!-- For Message libraries --> + <dependency> + <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> + <artifactId>message-lib</artifactId> + </dependency> + + <!-- For spring-kafka --> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka-test</artifactId> + <scope>test</scope> + </dependency> + + <!-- Apache Kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </project> diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index eff977348..60016fb98 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -17,15 +17,17 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import com.fasterxml.jackson.databind.JsonNode +import io.swagger.annotations.Api import io.swagger.annotations.ApiOperation +import io.swagger.annotations.ApiParam import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.beans.factory.annotation.Autowired -import org.springframework.http.HttpStatus import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.http.codec.multipart.FilePart @@ -34,43 +36,59 @@ import org.springframework.web.bind.annotation.* @RestController @RequestMapping("/api/v1/execution-service") +@Api(value = "/api/v1/execution-service", + description = "Interaction with CBA.") open class ExecutionServiceController { @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler - @RequestMapping(path = ["/ping"], method = [RequestMethod.GET], produces = [MediaType.APPLICATION_JSON_VALUE]) + @RequestMapping(path = ["/health-check"], + method = [RequestMethod.GET], + produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody - fun ping(): String = runBlocking { - "Success" + @ApiOperation(value = "Health Check", hidden = true) + fun executionServiceControllerHealthCheck(): JsonNode = runBlocking { + JacksonUtils.getJsonNode("Success") } @PostMapping(path = ["/upload"], consumes = [MediaType.MULTIPART_FORM_DATA_VALUE]) - @ApiOperation(value = "Upload CBA", notes = "Takes a File and load it in the runtime database") @ResponseBody @PreAuthorize("hasRole('USER')") - fun upload(@RequestPart("file") filePart: FilePart): String = runBlocking { - executionServiceHandler.upload(filePart) + @ApiOperation(value = "Upload a CBA", + notes = "Upload the CBA package. This will also run validation on the CBA.", + produces = MediaType.APPLICATION_JSON_VALUE) + fun upload(@ApiParam(value = "The ZIP file containing the overall CBA package.", required = true) + @RequestPart("file") filePart: FilePart): JsonNode = runBlocking { + JacksonUtils.getJsonNode(executionServiceHandler.upload(filePart)) } @DeleteMapping("/name/{name}/version/{version}") - @Throws(BluePrintException::class) + @ApiOperation(value = "Delete a CBA", + notes = "Delete the CBA package identified by its name and version.", + produces = MediaType.APPLICATION_JSON_VALUE) @PreAuthorize("hasRole('USER')") - fun deleteBlueprint(@PathVariable(value = "name") name: String, + fun deleteBlueprint(@ApiParam(value = "Name of the CBA.", required = true) + @PathVariable(value = "name") name: String, + @ApiParam(value = "Version of the CBA.", required = true) @PathVariable(value = "version") version: String) = runBlocking { executionServiceHandler.remove(name, version) } @RequestMapping(path = ["/process"], method = [RequestMethod.POST], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ApiOperation(value = "Resolve Resource Mappings", - notes = "Takes the blueprint information and process as per the payload") + @ApiOperation(value = "Execute a CBA workflow (action)", + notes = "Execute the appropriate CBA's action based on the ExecutionServiceInput object passed as input.", + produces = MediaType.APPLICATION_JSON_VALUE, + response = ExecutionServiceOutput::class) @ResponseBody @PreAuthorize("hasRole('USER')") - fun process(@RequestBody executionServiceInput: ExecutionServiceInput): ResponseEntity<ExecutionServiceOutput> = runBlocking { - if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { - throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") + fun process(@ApiParam(value = "ExecutionServiceInput payload.", required = true) + @RequestBody executionServiceInput: ExecutionServiceInput): ResponseEntity<ExecutionServiceOutput> = + runBlocking { + if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { + throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") + } + val processResult = executionServiceHandler.doProcess(executionServiceInput) + ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } - val processResult = executionServiceHandler.doProcess(executionServiceInput) - ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) - } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt new file mode 100644 index 000000000..a04a79921 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt @@ -0,0 +1,47 @@ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer + +@Configuration +open class MessagingConfig { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") + lateinit var groupId: String + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? { + val configProperties = hashMapOf<String, Any>() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name + configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java)) + } + + /** + * Creation of a Kafka MessageListener Container + * + * @return KafkaListener instance. + */ + @Bean + open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { + val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() + factory.consumerFactory = consumerFactory() + return factory + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt new file mode 100644 index 000000000..1d219a83e --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt @@ -0,0 +1,74 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import kotlinx.coroutines.async +import kotlinx.coroutines.runBlocking +import org.apache.commons.lang3.builder.ToStringBuilder +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.stereotype.Service + +@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true") +@Service +open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler) { + + private val log = LoggerFactory.getLogger(MessagingController::class.java)!! + + companion object { + // TODO PREFIX should be retrieved from model or from request. + const val PREFIX = "self-service-api" + const val EXECUTION_STATUS = 200 + } + + @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) + open fun receive(input: ExecutionServiceInput) { + + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + + runBlocking { + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + + // Process the message. + async { + processMessage(input) + } + } + } + + private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) { + + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + + if (executionServiceOutput.status.code == EXECUTION_STATUS) { + val bluePrintMessageClientService = propertyService + .blueprintMessageClientService(PREFIX) + + val payload = executionServiceOutput.payload + + log.info("The payload to publish is {}", payload) + + bluePrintMessageClientService.sendMessage(payload) + } + else { + log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage) + } + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt index fd764d78f..e084c60cf 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt @@ -23,6 +23,8 @@ import io.grpc.testing.GrpcServerRule import org.junit.Rule import org.junit.Test import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile @@ -33,6 +35,7 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.FileChunk import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -44,7 +47,9 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @EnableAutoConfiguration @DirtiesContext -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class, + MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)]) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintManagementGRPCHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt index a84bf5b5f..ce5acd400 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -43,7 +44,8 @@ import kotlin.test.BeforeTest @RunWith(SpringRunner::class) @DirtiesContext @EnableAutoConfiguration -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintProcessingGRPCHandlerTest { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 9cbd898dc..65b41262b 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.security.SecurityProperties import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.core.io.ByteArrayResource import org.springframework.http.client.MultipartBodyBuilder import org.springframework.test.context.ContextConfiguration @@ -49,7 +50,8 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @WebFluxTest @ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class]) -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], + excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) @TestPropertySource(locations = ["classpath:application-test.properties"]) class ExecutionServiceHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt new file mode 100644 index 000000000..f7459f522 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt @@ -0,0 +1,211 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib + +import com.fasterxml.jackson.databind.node.ObjectNode +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.runBlocking +import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController +import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.autoconfigure.security.SecurityProperties +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.core.io.ByteArrayResource +import org.springframework.http.client.MultipartBodyBuilder +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.annotation.PartitionOffset +import org.springframework.kafka.annotation.TopicPartition +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import org.springframework.test.web.reactive.server.WebTestClient +import org.springframework.test.web.reactive.server.returnResult +import org.springframework.web.reactive.function.BodyInserters +import java.io.File +import java.nio.file.Files +import java.nio.file.Paths +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@EnableAutoConfiguration +@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@TestPropertySource(locations = ["classpath:application-test.properties"]) +@DirtiesContext +@EmbeddedKafka(ports = [9092]) +@WebFluxTest +class MessagingControllerTest { + + private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!! + + @Autowired + lateinit var controller: MessagingController + + @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}") + lateinit var topicUsedForConsumer: String + + @Autowired + lateinit var kt: KafkaTemplate<String, ExecutionServiceInput> + + @Autowired + lateinit var webTestClient: WebTestClient + + var receivedEvent: String? = null + + @Before + fun setup() { + deleteDir("target", "blueprints") + uploadBluePrint() + } + + @After + fun clean() { + deleteDir("target", "blueprints") + } + + @Test + fun testReceive() { + val samplePayload = "{\n" + + " \"resource-assignment-request\": {\n" + + " \"artifact-name\": [\"hostname\"],\n" + + " \"store-result\": true,\n" + + " \"resource-assignment-properties\" : {\n" + + " \"hostname\": \"demo123\"\n" + + " }\n" + + " }\n" + + " }" + + kt.defaultTopic = topicUsedForConsumer + + val input = ExecutionServiceInput().apply { + commonHeader = CommonHeader().apply { + originatorId = "1" + requestId = "1234" + subRequestId = "1234-1234" + } + + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "golden" + blueprintVersion = "1.0.0" + actionName = "resource-assignment" + mode = "sync" + } + + stepData = StepData().apply { + name = "resource-assignment" + } + + payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode + } + + kt.sendDefault(input) + log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input)) + + Thread.sleep(1000) + } + + @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])]) + fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) { + assertNotNull(event) + } + + private fun uploadBluePrint() { + runBlocking { + val body = MultipartBodyBuilder().apply { + part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) { + override fun getFilename(): String { + return "test-cba.zip" + } + }) + }.build() + + webTestClient + .post() + .uri("/api/v1/execution-service/upload") + .body(BodyInserters.fromMultipartData(body)) + .exchange() + .expectStatus().isOk + .returnResult<String>() + .responseBody + .awaitSingle() + } + } + + private fun loadCbaArchive():File { + return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile() + } + + @Configuration + @EnableKafka + open class ConsumerConfiguration { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") + lateinit var groupId:String + + @Bean + open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? { + val configProperties = hashMapOf<String, Any>() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000 + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), + JsonDeserializer(ExecutionServiceInput::class.java)) + } + + @Bean + open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { + val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() + factory.consumerFactory = consumerFactory2() + return factory + } + } +} + + diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt new file mode 100644 index 000000000..dc1f38a63 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt @@ -0,0 +1,48 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +open class ProducerConfiguration { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + open fun kpf(): ProducerFactory<String, ExecutionServiceInput> { + val configs = HashMap<String, Any>() + configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java + return DefaultKafkaProducerFactory(configs) + } + + @Bean + open fun kt(): KafkaTemplate<String, ExecutionServiceInput> { + return KafkaTemplate<String, ExecutionServiceInput>(kpf()) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties index 6705523df..d532b1582 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties @@ -31,3 +31,12 @@ blueprintsprocessor.blueprintArchivePath=./target/blueprints/archive # Python executor blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints + +# Kafka-message-lib Configuration +blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true +blueprintsprocessor.messageclient.self-service-api.topic=producer.t +blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t +blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id +blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip Binary files differnew file mode 100644 index 000000000..23070380c --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip |