summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-09-05 18:54:47 +0000
committerGerrit Code Review <gerrit@onap.org>2019-09-05 18:54:47 +0000
commit16dfa234954d54334f73b2a5c0ed3da47ff27c13 (patch)
tree9d3c03cc60d0d247c77b48b2eaafb97b1b1f27b1 /ms/blueprintsprocessor/modules
parentc656c611de770cb531199bf47c35cb41389fc100 (diff)
parenta2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (diff)
Merge "Add Config based blueprint process consumer"
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt23
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt52
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt108
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt58
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt72
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt7
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt66
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt100
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt8
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt215
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt48
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties19
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml (renamed from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml)70
18 files changed, 393 insertions, 470 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 281a970b8..27a444bdc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
class MessageLibConstants {
companion object {
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
- //TODO("Change to .messageconsumer in application.properties")
- const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+ const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index c77cdfdb5..ab04054fe 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -33,8 +33,9 @@ open class MessageConsumerProperties
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
- var consumerTopic: String? = null
- var pollMillSec: Long = 100
+ var clientId: String? = null
+ var topic: String? = null
+ var pollMillSec: Long = 1000
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 076501eab..5a9e61bfd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
@@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService(
configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ if (messageConsumerProperties.clientId != null) {
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
+ }
+ // TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
@@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
/** get to topic names */
- val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
return subscribe(consumerTopic, additionalConfig)
}
@@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService(
override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
+
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
"server(${messageConsumerProperties.bootstrapServers})'s " +
@@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.subscribe(consumerTopic)
log.info("Successfully consumed topic($consumerTopic)")
- val listenerThread = thread(start = true, name = "KafkaConsumer") {
+ thread(start = true, name = "KafkaConsumer") {
keepGoing = true
kafkaConsumer!!.use { kc ->
while (keepGoing) {
@@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService(
}
}
}
+ log.info("message listener shutting down.....")
}
-
}
- log.info("Successfully consumed in thread(${listenerThread})")
return channel
}
override suspend fun shutDown() {
- /** Close the Channel */
- channel.close()
/** stop the polling loop */
keepGoing = false
- if (kafkaConsumer != null) {
- /** sunsubscribe the consumer */
- kafkaConsumer!!.unsubscribe()
- }
+ /** Close the Channel */
+ channel.cancel()
+ /** TO shutdown gracefully, need to wait for the maximum poll time */
+ delay(messageConsumerProperties.pollMillSec)
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 008e92437..1c93bb0fc 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService(
}
fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- log.info("Prepering templates")
if (kafkaTemplate == null) {
kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 18b86b8d8..2b84eaa78 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import io.mockk.every
import io.mockk.spyk
import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -30,12 +31,14 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@@ -43,12 +46,20 @@ import kotlin.test.assertNotNull
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
BlueprintPropertyConfiguration::class, BluePrintProperties::class])
@TestPropertySource(properties =
-["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
- "blueprintsprocessor.messageclient.sample.groupId=sample-group",
- "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
+["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
])
open class BlueprintMessageConsumerServiceTest {
+ val log = logger(BlueprintMessageConsumerServiceTest::class)
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -90,11 +101,40 @@ open class BlueprintMessageConsumerServiceTest {
val channel = spyBlueprintMessageConsumerService.subscribe(null)
launch {
channel.consumeEach {
- println("Received message : $it")
+ assertTrue(it.startsWith("I am message"), "failed to get the actual message")
}
}
- //delay(100)
+ delay(10)
spyBlueprintMessageConsumerService.shutDown()
}
}
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testKafkaIntegration() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val channel = blueprintMessageConsumerService.subscribe(null)
+ launch {
+ channel.consumeEach {
+ log.info("Consumed Message : $it")
+ }
+ }
+
+ /** Send message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ launch {
+ repeat(5) {
+ delay(1000)
+ blueprintMessageProducerService.sendMessage("this is my message($it)")
+ }
+ }
+ delay(10000)
+ blueprintMessageConsumerService.shutDown()
+ }
+ }
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 0db62c1df..31bcc1517 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -42,7 +42,7 @@ import kotlin.test.assertTrue
BlueprintPropertyConfiguration::class, BluePrintProperties::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
"blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
])
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
new file mode 100644
index 000000000..b339903c5
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.event.ApplicationReadyEvent
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import javax.annotation.PreDestroy
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
+ havingValue = "true")
+@Service
+open class BluePrintProcessingKafkaConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler) {
+
+ val log = logger(BluePrintProcessingKafkaConsumer::class)
+
+ private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
+
+ companion object {
+ const val CONSUMER_SELECTOR = "self-service-api"
+ const val PRODUCER_SELECTOR = "self-service-api"
+ }
+
+ @EventListener(ApplicationReadyEvent::class)
+ fun setupMessageListener() = runBlocking {
+ try {
+ log.info("Setting up message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR)...")
+
+ /** Get the Message Consumer Service **/
+ blueprintMessageConsumerService = try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(CONSUMER_SELECTOR)
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create consumer service ${e.message}")
+ }
+
+ /** Get the Message Producer Service **/
+ val blueprintMessageProducerService = try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService(PRODUCER_SELECTOR)
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create producer service ${e.message}")
+ }
+
+ launch {
+ /** Subscribe to the consumer topics */
+ val additionalConfig: MutableMap<String, Any> = hashMapOf()
+ val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
+ channel.consumeEach { message ->
+ launch {
+ try {
+ log.trace("Consumed Message : $message")
+ val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+ //TODO("In future, Message publisher configuration vary with respect to request")
+ /** Send the response message */
+ blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ } catch (e: Exception) {
+ log.error("failed in processing the consumed message : $message", e)
+ }
+ }
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed to start message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR) ", e)
+ }
+ }
+
+ @PreDestroy
+ fun shutdownMessageListener() = runBlocking {
+ try {
+ log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR)...")
+ blueprintMessageConsumerService.shutDown()
+ } catch (e: Exception) {
+ log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
+ }
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
deleted file mode 100644
index 17e157d15..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import com.fasterxml.jackson.databind.DeserializationFeature
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
-import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
-import org.springframework.kafka.support.serializer.JsonDeserializer
-
-@Configuration
-open class MessagingConfig {
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
- lateinit var groupId: String
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
- lateinit var bootstrapServers: String
-
- open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
- val configProperties = hashMapOf<String, Any>()
- configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
- configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
-
- val deserializer = JsonDeserializer<ExecutionServiceInput>()
- deserializer.setRemoveTypeHeaders(true)
- deserializer.addTrustedPackages("*")
-
- val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java,
- ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
-
- return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
- ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
- }
-
- /**
- * Creation of a Kafka MessageListener Container
- *
- * @return KafkaListener instance.
- */
- @Bean
- open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
- val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
- factory.consumerFactory = consumerFactory()
- return factory
- }
-} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
deleted file mode 100644
index e848ed384..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import kotlinx.coroutines.async
-import kotlinx.coroutines.runBlocking
-import org.apache.commons.lang3.builder.ToStringBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.slf4j.LoggerFactory
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
-import org.springframework.kafka.annotation.KafkaListener
-import org.springframework.stereotype.Service
-
-//TODO("Implement with property service and remove spring bindings")
-@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
-@Service
-open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
- private val executionServiceHandler: ExecutionServiceHandler) {
-
- private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
-
- companion object {
- // TODO PREFIX should be retrieved from model or from request.
- const val PREFIX = "self-service-api"
- const val EXECUTION_STATUS = 200
- }
-
- @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
- open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
-
- runBlocking {
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
-
- // Process the message.
- async {
- processMessage(record.value())
- }.await()
- }
- }
-
- private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
-
- val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-
- if (executionServiceOutput.status.code == EXECUTION_STATUS) {
- val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
-
- val payload = executionServiceOutput.payload
-
- log.info("The payload to publish is {}", payload)
-
- blueprintMessageProducerService.sendMessage(payload)
- } else {
- log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
index ea05e88c0..9629aa4b5 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
@@ -23,8 +23,6 @@ import io.grpc.testing.GrpcServerRule
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration
import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
@@ -33,7 +31,6 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.*
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
@@ -45,9 +42,7 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@EnableAutoConfiguration
@DirtiesContext
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
- excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class,
- MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
@TestPropertySource(locations = ["classpath:application-test.properties"])
class BluePrintManagementGRPCHandlerTest {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
index ce5acd400..8bedc9628 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
@@ -1,6 +1,7 @@
/*
* Copyright © 2017-2018 AT&T Intellectual Property.
* Modifications Copyright © 2019 Bell Canada.
+ * Modifications Copyright © 2019 IBM.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
@@ -44,8 +44,8 @@ import kotlin.test.BeforeTest
@RunWith(SpringRunner::class)
@DirtiesContext
@EnableAutoConfiguration
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
- excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor",
+ "org.onap.ccsdk.cds.controllerblueprints"])
@TestPropertySource(locations = ["classpath:application-test.properties"])
class BluePrintProcessingGRPCHandlerTest {
private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt
new file mode 100644
index 000000000..7d43f533f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import io.mockk.coEvery
+import io.mockk.mockk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+class BluePrintProcessingKafkaConsumerTest {
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Test
+ fun testExecutionInputMessageConsumer() {
+ runBlocking {
+ assertNotNull(bluePrintMessageLibPropertyService,
+ "failed to initialise bluePrintMessageLibPropertyService")
+
+ val executionServiceHandle = mockk<ExecutionServiceHandler>()
+
+ coEvery { executionServiceHandle.doProcess(any()) } returns mockk()
+
+ val bluePrintProcessingKafkaConsumer = BluePrintProcessingKafkaConsumer(bluePrintMessageLibPropertyService,
+ executionServiceHandle)
+
+ launch {
+ bluePrintProcessingKafkaConsumer.setupMessageListener()
+ }
+ delay(100)
+ bluePrintProcessingKafkaConsumer.shutdownMessageListener()
+ }
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt
new file mode 100644
index 000000000..fc6c4890c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2019 Bell Canada
+ * Modifications Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.reactive.awaitSingle
+import kotlinx.coroutines.runBlocking
+import org.junit.After
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.autoconfigure.security.SecurityProperties
+import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.core.io.ByteArrayResource
+import org.springframework.http.client.MultipartBodyBuilder
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import org.springframework.test.web.reactive.server.WebTestClient
+import org.springframework.test.web.reactive.server.returnResult
+import org.springframework.web.reactive.function.BodyInserters
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Paths
+import kotlin.test.Test
+
+@RunWith(SpringRunner::class)
+@EnableAutoConfiguration
+@ContextConfiguration(classes = [ExecutionServiceControllerTest::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+@DirtiesContext
+@WebFluxTest
+class ExecutionServiceControllerTest {
+
+ private val log = LoggerFactory.getLogger(ExecutionServiceControllerTest::class.java)!!
+
+ @Autowired
+ lateinit var webTestClient: WebTestClient
+
+ var event: ExecutionServiceInput? = null
+
+ @Before
+ fun setup() {
+ deleteDir("target", "blueprints")
+ }
+
+ @After
+ fun clean() {
+ deleteDir("target", "blueprints")
+ }
+
+ @Test
+ fun uploadBluePrint() {
+ runBlocking {
+ val body = MultipartBodyBuilder().apply {
+ part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
+ override fun getFilename(): String {
+ return "test-cba.zip"
+ }
+ })
+ }.build()
+
+ webTestClient
+ .post()
+ .uri("/api/v1/execution-service/upload")
+ .body(BodyInserters.fromMultipartData(body))
+ .exchange()
+ .expectStatus().isOk
+ .returnResult<String>()
+ .responseBody
+ .awaitSingle()
+ }
+ }
+
+ private fun loadCbaArchive(): File {
+ return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
+ }
+}
+
+
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
index d9e352bff..a480b115b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
@@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.security.SecurityProperties
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
import org.springframework.core.io.ByteArrayResource
import org.springframework.http.client.MultipartBodyBuilder
import org.springframework.test.context.ContextConfiguration
@@ -49,9 +48,10 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@WebFluxTest
-@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
- excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
+@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class,
+ BluePrintCatalogService::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor",
+ "org.onap.ccsdk.cds.controllerblueprints"])
@TestPropertySource(locations = ["classpath:application-test.properties"])
class ExecutionServiceHandlerTest {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
deleted file mode 100644
index facbec585..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
-
-import com.fasterxml.jackson.databind.node.ObjectNode
-import kotlinx.coroutines.reactive.awaitSingle
-import kotlinx.coroutines.runBlocking
-import org.apache.commons.lang.builder.ToStringBuilder
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.After
-import org.junit.Before
-import org.junit.Ignore
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController
-import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import org.slf4j.LoggerFactory
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration
-import org.springframework.boot.autoconfigure.security.SecurityProperties
-import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.Configuration
-import org.springframework.core.io.ByteArrayResource
-import org.springframework.http.client.MultipartBodyBuilder
-import org.springframework.kafka.annotation.EnableKafka
-import org.springframework.kafka.annotation.KafkaListener
-import org.springframework.kafka.annotation.PartitionOffset
-import org.springframework.kafka.annotation.TopicPartition
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.serializer.JsonDeserializer
-import org.springframework.kafka.test.context.EmbeddedKafka
-import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.ContextConfiguration
-import org.springframework.test.context.TestPropertySource
-import org.springframework.test.context.junit4.SpringRunner
-import org.springframework.test.web.reactive.server.WebTestClient
-import org.springframework.test.web.reactive.server.returnResult
-import org.springframework.web.reactive.function.BodyInserters
-import java.io.File
-import java.nio.file.Files
-import java.nio.file.Paths
-import kotlin.test.assertNotNull
-//FIXME("testReceive method is failing in server build, It is not stable, may be timing issue.")
-@Ignore
-@RunWith(SpringRunner::class)
-@EnableAutoConfiguration
-@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
-@TestPropertySource(locations = ["classpath:application-test.properties"])
-@DirtiesContext
-@EmbeddedKafka(ports = [9092])
-@WebFluxTest
-class MessagingControllerTest {
-
- private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!!
-
- @Autowired
- lateinit var controller: MessagingController
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}")
- lateinit var topicUsedForConsumer: String
-
- @Autowired
- lateinit var kt: KafkaTemplate<String, ExecutionServiceInput>
-
- @Autowired
- lateinit var webTestClient: WebTestClient
-
- var event: ExecutionServiceInput? = null
-
- @Before
- fun setup() {
- deleteDir("target", "blueprints")
- uploadBluePrint()
- }
-
- @After
- fun clean() {
- deleteDir("target", "blueprints")
- }
-
- @Test
- fun testReceive() {
- val samplePayload = "{\n" +
- " \"resource-assignment-request\": {\n" +
- " \"artifact-name\": [\"hostname\"],\n" +
- " \"store-result\": true,\n" +
- " \"resource-assignment-properties\" : {\n" +
- " \"hostname\": \"demo123\"\n" +
- " }\n" +
- " }\n" +
- " }"
-
- kt.defaultTopic = topicUsedForConsumer
-
- val input = ExecutionServiceInput().apply {
- commonHeader = CommonHeader().apply {
- originatorId = "1"
- requestId = "1234"
- subRequestId = "1234-1234"
- }
-
- actionIdentifiers = ActionIdentifiers().apply {
- blueprintName = "golden"
- blueprintVersion = "1.0.0"
- actionName = "resource-assignment"
- mode = "sync"
- }
-
- stepData = StepData().apply {
- name = "resource-assignment"
- }
-
- payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode
- }
-
- kt.sendDefault(input)
- log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
-
- Thread.sleep(1000)
-
- assertNotNull(event)
- }
-
- @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
- fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) {
- event = receivedEvent
- }
-
- private fun uploadBluePrint() {
- runBlocking {
- val body = MultipartBodyBuilder().apply {
- part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
- override fun getFilename(): String {
- return "test-cba.zip"
- }
- })
- }.build()
-
- webTestClient
- .post()
- .uri("/api/v1/execution-service/upload")
- .body(BodyInserters.fromMultipartData(body))
- .exchange()
- .expectStatus().isOk
- .returnResult<String>()
- .responseBody
- .awaitSingle()
- }
- }
-
- private fun loadCbaArchive():File {
- return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
- }
-
- @Configuration
- @EnableKafka
- open class ConsumerConfiguration {
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
- lateinit var bootstrapServers: String
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
- lateinit var groupId:String
-
- @Bean
- open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? {
- val configProperties = hashMapOf<String, Any>()
- configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
- configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000
-
- return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
- JsonDeserializer(ExecutionServiceInput::class.java))
- }
-
- @Bean
- open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
- val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
- factory.consumerFactory = consumerFactory2()
- return factory
- }
- }
-}
-
-
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
deleted file mode 100644
index dc1f38a63..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.StringSerializer
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.annotation.EnableKafka
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.serializer.JsonSerializer
-
-@Configuration
-open class ProducerConfiguration {
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
- lateinit var bootstrapServers: String
-
- open fun kpf(): ProducerFactory<String, ExecutionServiceInput> {
- val configs = HashMap<String, Any>()
- configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
- return DefaultKafkaProducerFactory(configs)
- }
-
- @Bean
- open fun kt(): KafkaTemplate<String, ExecutionServiceInput> {
- return KafkaTemplate<String, ExecutionServiceInput>(kpf())
- }
-} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
index ab3bac88f..d18b70010 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
@@ -33,10 +33,15 @@ blueprints.processor.functions.python.executor.executionPath=./../../../../compo
blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
# Kafka-message-lib Configuration
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10
+
+blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml
index 0c8d93bf0..dd81657a4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml
@@ -1,35 +1,35 @@
-<!--
- ~ Copyright © 2017-2018 AT&T Intellectual Property.
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <!-- encoders are assigned the type
- ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
- </encoder>
- </appender>
-
-
- <logger name="org.springframework" level="warn"/>
- <logger name="org.hibernate" level="info"/>
- <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
-
- <root level="warn">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>
+<!--
+ ~ Copyright © 2017-2018 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate" level="info"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>