aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-07-08 13:37:11 +0000
committerGerrit Code Review <gerrit@onap.org>2019-07-08 13:37:11 +0000
commit8607db90d88ab2a1bec4f3fc4be22f16d09d1a29 (patch)
tree10e166defa1f299b7e6b587596db3d2d108a9461 /ms/blueprintsprocessor/modules
parentbf228082d558a2e1c7a93dd2334dfd105483fc2f (diff)
parentb134a815fbb3404e46551f8f2361e6cca6c7728d (diff)
Merge "Kafka Messaging Controller API."
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt5
-rwxr-xr-xms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml25
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt47
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt74
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt7
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt211
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt48
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties9
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zipbin0 -> 20700 bytes
11 files changed, 428 insertions, 6 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 52ac346db..008e92437 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -16,6 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.ProducerConfig.*
import org.apache.kafka.common.serialization.StringSerializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
@@ -27,7 +28,6 @@ import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.SendResult
import org.springframework.util.concurrent.ListenableFutureCallback
-
class KafkaBasicAuthMessageProducerService(
private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
: BlueprintMessageProducerService {
@@ -64,9 +64,8 @@ class KafkaBasicAuthMessageProducerService(
return true
}
-
private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
- log.info("Client Properties : $messageProducerProperties")
+ log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = hashMapOf<String, Any>()
configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml
index 340f2c618..89ad720f6 100755
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml
@@ -42,6 +42,7 @@
<artifactId>proto-definition</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.onap.ccsdk.cds.controllerblueprints</groupId>
<artifactId>blueprint-core</artifactId>
@@ -59,6 +60,30 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- For Message libraries -->
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>message-lib</artifactId>
+ </dependency>
+
+ <!-- For spring-kafka -->
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Apache Kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
new file mode 100644
index 000000000..a04a79921
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
@@ -0,0 +1,47 @@
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.JsonDeserializer
+
+@Configuration
+open class MessagingConfig {
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
+ lateinit var groupId: String
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+ lateinit var bootstrapServers: String
+
+ open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
+ val configProperties = hashMapOf<String, Any>()
+ configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
+ configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+ return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+ }
+
+ /**
+ * Creation of a Kafka MessageListener Container
+ *
+ * @return KafkaListener instance.
+ */
+ @Bean
+ open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
+ val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
+ factory.consumerFactory = consumerFactory()
+ return factory
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
new file mode 100644
index 000000000..1d219a83e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.async
+import kotlinx.coroutines.runBlocking
+import org.apache.commons.lang3.builder.ToStringBuilder
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.stereotype.Service
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
+@Service
+open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler) {
+
+ private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
+
+ companion object {
+ // TODO PREFIX should be retrieved from model or from request.
+ const val PREFIX = "self-service-api"
+ const val EXECUTION_STATUS = 200
+ }
+
+ @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
+ open fun receive(input: ExecutionServiceInput) {
+
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+ runBlocking {
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+ // Process the message.
+ async {
+ processMessage(input)
+ }
+ }
+ }
+
+ private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
+
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+
+ if (executionServiceOutput.status.code == EXECUTION_STATUS) {
+ val bluePrintMessageClientService = propertyService
+ .blueprintMessageClientService(PREFIX)
+
+ val payload = executionServiceOutput.payload
+
+ log.info("The payload to publish is {}", payload)
+
+ bluePrintMessageClientService.sendMessage(payload)
+ }
+ else {
+ log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
index fd764d78f..e084c60cf 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
@@ -23,6 +23,8 @@ import io.grpc.testing.GrpcServerRule
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
@@ -33,6 +35,7 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.FileChunk
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
@@ -44,7 +47,9 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@EnableAutoConfiguration
@DirtiesContext
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+ excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class,
+ MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)])
@TestPropertySource(locations = ["classpath:application-test.properties"])
class BluePrintManagementGRPCHandlerTest {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
index f8b972e64..5072b3c6a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
@@ -45,7 +46,8 @@ import kotlin.test.BeforeTest
@RunWith(SpringRunner::class)
@DirtiesContext
@EnableAutoConfiguration
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+ excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
@TestPropertySource(locations = ["classpath:application-test.properties"])
class BluePrintProcessingGRPCHandlerTest {
private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
index 9cbd898dc..65b41262b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
@@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.security.SecurityProperties
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
import org.springframework.core.io.ByteArrayResource
import org.springframework.http.client.MultipartBodyBuilder
import org.springframework.test.context.ContextConfiguration
@@ -49,7 +50,8 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@WebFluxTest
@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+ excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
@TestPropertySource(locations = ["classpath:application-test.properties"])
class ExecutionServiceHandlerTest {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
new file mode 100644
index 000000000..f7459f522
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
@@ -0,0 +1,211 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
+
+import com.fasterxml.jackson.databind.node.ObjectNode
+import kotlinx.coroutines.reactive.awaitSingle
+import kotlinx.coroutines.runBlocking
+import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.autoconfigure.security.SecurityProperties
+import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.io.ByteArrayResource
+import org.springframework.http.client.MultipartBodyBuilder
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.kafka.annotation.PartitionOffset
+import org.springframework.kafka.annotation.TopicPartition
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.test.context.EmbeddedKafka
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import org.springframework.test.web.reactive.server.WebTestClient
+import org.springframework.test.web.reactive.server.returnResult
+import org.springframework.web.reactive.function.BodyInserters
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Paths
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@EnableAutoConfiguration
+@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+@DirtiesContext
+@EmbeddedKafka(ports = [9092])
+@WebFluxTest
+class MessagingControllerTest {
+
+ private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!!
+
+ @Autowired
+ lateinit var controller: MessagingController
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}")
+ lateinit var topicUsedForConsumer: String
+
+ @Autowired
+ lateinit var kt: KafkaTemplate<String, ExecutionServiceInput>
+
+ @Autowired
+ lateinit var webTestClient: WebTestClient
+
+ var receivedEvent: String? = null
+
+ @Before
+ fun setup() {
+ deleteDir("target", "blueprints")
+ uploadBluePrint()
+ }
+
+ @After
+ fun clean() {
+ deleteDir("target", "blueprints")
+ }
+
+ @Test
+ fun testReceive() {
+ val samplePayload = "{\n" +
+ " \"resource-assignment-request\": {\n" +
+ " \"artifact-name\": [\"hostname\"],\n" +
+ " \"store-result\": true,\n" +
+ " \"resource-assignment-properties\" : {\n" +
+ " \"hostname\": \"demo123\"\n" +
+ " }\n" +
+ " }\n" +
+ " }"
+
+ kt.defaultTopic = topicUsedForConsumer
+
+ val input = ExecutionServiceInput().apply {
+ commonHeader = CommonHeader().apply {
+ originatorId = "1"
+ requestId = "1234"
+ subRequestId = "1234-1234"
+ }
+
+ actionIdentifiers = ActionIdentifiers().apply {
+ blueprintName = "golden"
+ blueprintVersion = "1.0.0"
+ actionName = "resource-assignment"
+ mode = "sync"
+ }
+
+ stepData = StepData().apply {
+ name = "resource-assignment"
+ }
+
+ payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode
+ }
+
+ kt.sendDefault(input)
+ log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
+
+ Thread.sleep(1000)
+ }
+
+ @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
+ fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) {
+ assertNotNull(event)
+ }
+
+ private fun uploadBluePrint() {
+ runBlocking {
+ val body = MultipartBodyBuilder().apply {
+ part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
+ override fun getFilename(): String {
+ return "test-cba.zip"
+ }
+ })
+ }.build()
+
+ webTestClient
+ .post()
+ .uri("/api/v1/execution-service/upload")
+ .body(BodyInserters.fromMultipartData(body))
+ .exchange()
+ .expectStatus().isOk
+ .returnResult<String>()
+ .responseBody
+ .awaitSingle()
+ }
+ }
+
+ private fun loadCbaArchive():File {
+ return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile()
+ }
+
+ @Configuration
+ @EnableKafka
+ open class ConsumerConfiguration {
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+ lateinit var bootstrapServers: String
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
+ lateinit var groupId:String
+
+ @Bean
+ open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? {
+ val configProperties = hashMapOf<String, Any>()
+ configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
+ configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000
+
+ return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
+ JsonDeserializer(ExecutionServiceInput::class.java))
+ }
+
+ @Bean
+ open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
+ val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
+ factory.consumerFactory = consumerFactory2()
+ return factory
+ }
+ }
+}
+
+
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
new file mode 100644
index 000000000..dc1f38a63
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.core.ProducerFactory
+import org.springframework.kafka.support.serializer.JsonSerializer
+
+@Configuration
+open class ProducerConfiguration {
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+ lateinit var bootstrapServers: String
+
+ open fun kpf(): ProducerFactory<String, ExecutionServiceInput> {
+ val configs = HashMap<String, Any>()
+ configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
+ return DefaultKafkaProducerFactory(configs)
+ }
+
+ @Bean
+ open fun kt(): KafkaTemplate<String, ExecutionServiceInput> {
+ return KafkaTemplate<String, ExecutionServiceInput>(kpf())
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
index 6705523df..d532b1582 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
@@ -31,3 +31,12 @@ blueprintsprocessor.blueprintArchivePath=./target/blueprints/archive
# Python executor
blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints
blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
+
+# Kafka-message-lib Configuration
+blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true
+blueprintsprocessor.messageclient.self-service-api.topic=producer.t
+blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
+blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
new file mode 100644
index 000000000..23070380c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
Binary files differ