summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt108
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt78
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt14
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt125
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt98
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt44
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt1
13 files changed, 276 insertions, 231 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index c6587c799..659295a6b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -57,6 +57,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
class MessageLibConstants {
companion object {
+
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index d76621c26..67dba1f19 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig
/** Common Properties **/
abstract class CommonProperties {
+
lateinit var type: String
lateinit var topic: String
lateinit var bootstrapServers: String
@@ -73,6 +74,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
/** SSL Auth */
open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -100,6 +102,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -109,8 +112,8 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
@@ -122,6 +125,7 @@ abstract class MessageConsumerProperties : CommonProperties()
/** Basic Auth */
open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
+
lateinit var applicationId: String
var autoOffsetReset: String = "latest"
var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
@@ -137,6 +141,7 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties()
/** SSL Auth */
open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -163,6 +168,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
/** (SASL) SCRAM SSL Auth */
class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -172,8 +178,8 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
@@ -182,6 +188,7 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr
/** Message Consumer Properties **/
/** Basic Auth */
open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
+
lateinit var groupId: String
lateinit var clientId: String
var autoCommit: Boolean = true
@@ -213,6 +220,7 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties()
/** SSL Auth */
open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -239,6 +247,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -248,8 +257,8 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index e9bc5d8ad..8b31de9b9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -102,15 +102,15 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
)
}
fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
)
}
}
@@ -125,14 +125,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth
fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaSslAuthMessageProducerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaScramSslAuthMessageProducerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -174,58 +174,60 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
}
open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
+ property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
+ property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
+ property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
+ property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
+ property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
+ property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
}
/** Relationships Templates DSL for Message Consumer */
@@ -255,15 +257,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
)
}
fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
)
}
@@ -276,15 +278,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
)
}
fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
)
}
}
@@ -299,14 +301,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth
fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaSslAuthMessageConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -320,14 +322,14 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa
fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsSslAuthConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -378,58 +380,60 @@ open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageCon
}
open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
+ property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
+ property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
+ property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
+ property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
+ property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
+ property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
}
/** KafkaStreamsConsumerProperties assignment builder */
@@ -462,56 +466,58 @@ open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageCon
}
open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
+ property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
+ property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
+ property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
+ property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 67fbef580..456bfbdd9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -59,12 +59,12 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaSslAuthMessageProducerProperties::class.java
+ prefix, KafkaSslAuthMessageProducerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+ prefix, KafkaScramSslAuthMessageProducerProperties::class.java
)
}
else -> {
@@ -113,33 +113,33 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
/** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaSslAuthMessageConsumerProperties::class.java
+ prefix, KafkaSslAuthMessageConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+ prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
)
}
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+ prefix, KafkaStreamsSslAuthConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+ prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
)
}
else -> {
@@ -178,44 +178,44 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
- BlueprintMessageConsumerService {
+ BlueprintMessageConsumerService {
- when (messageConsumerProperties.type) {
- /** Message Consumer */
- MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- return KafkaMessageConsumerService(
+ when (messageConsumerProperties.type) {
+ /** Message Consumer */
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
- return KafkaMessageConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
- return KafkaMessageConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
- )
- }
- /** Stream Consumer */
- MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ /** Stream Consumer */
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
- )
- }
- else -> {
- throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
+ )
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
+ }
}
}
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 311d35c38..b39d89bfd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -74,6 +74,7 @@ interface KafkaConsumerRecordsFunction : ConsumerFunction {
}
interface KafkaStreamConsumerFunction : ConsumerFunction {
+
suspend fun createTopology(
messageConsumerProperties: MessageConsumerProperties,
additionalConfig: Map<String, Any>?
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index 66d3a5b73..36392cff3 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -25,11 +25,17 @@ interface BlueprintMessageProducerService {
sendMessageNB(key, message, headers)
}
- fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
- sendMessageNB(key, topic, message, headers)
- }
+ fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean =
+ runBlocking {
+ sendMessageNB(key, topic, message, headers)
+ }
suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
- suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
+ suspend fun sendMessageNB(
+ key: String = UUID.randomUUID().toString(),
+ topic: String,
+ message: Any,
+ headers: MutableMap<String, String>? = null
+ ): Boolean
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 59e9192bb..eccf75301 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -44,6 +44,7 @@ class KafkaMessageProducerService(
private val messageLoggerService = MessageLoggerService()
companion object {
+
const val MAX_ERR_MSG_LEN = 128
}
@@ -113,8 +114,8 @@ class KafkaMessageProducerService(
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
- " [...]. Check Blueprint Processor logs for more information."
+ truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ " [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
var truncPayload = executionServiceOutput.payload.deepCopy()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
index 04b754b13..90b850017 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -61,7 +61,8 @@ class MessageLoggerService {
val headers = consumerRecord.headers().toMap()
val localhost = InetAddress.getLocalHost()
MDC.put(
- "InvokeTimestamp", ZonedDateTime
+ "InvokeTimestamp",
+ ZonedDateTime
.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_INSTANT)
)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index b1af230b9..b68678baf 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -67,12 +67,12 @@ class MessagePropertiesDSLTest {
assertNotNull(relationshipTypes, "failed to get relationship types")
assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match")
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
)
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
)
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 77bdbe408..a69f9f51a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -59,36 +59,39 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+ classes = [
+ BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
+ ]
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
- "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
- "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
- "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
- "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
- "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
- "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
+ [
+ "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
- ]
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
+ ]
)
open class BlueprintMessageConsumerServiceTest {
@@ -206,46 +209,52 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaScramSslAuthConfig() {
val expectedConfig = mapOf<String, Any>(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
- ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
- CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
- SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
- SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
- SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
- SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
- SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
- SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
- SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"sample-user\" " +
- "password=\"secretpassword\";"
- )
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
val configProps = messageConsumerProperties.getConfig()
- assertEquals(messageConsumerProperties.topic,
- "default-topic",
- "Topic doesn't match the expected value"
+ assertEquals(
+ messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageConsumerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value"
)
- assertEquals(messageConsumerProperties.type,
- "kafka-scram-ssl-auth",
- "Authentication type doesn't match the expected value")
expectedConfig.forEach {
- assertTrue(configProps.containsKey(it.key),
- "Missing expected kafka config key : ${it.key}")
- assertEquals(configProps[it.key],
- it.value,
- "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
)
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 881f0b422..f88caa173 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -49,22 +49,25 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+ classes = [
+ BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
+ ]
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
- ]
+ [
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
+ ]
)
open class BlueprintMessageProducerServiceTest {
@@ -96,48 +99,53 @@ open class BlueprintMessageProducerServiceTest {
@Test
fun testKafkaScramSslAuthConfig() {
val expectedConfig = mapOf<String, Any>(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
- ProducerConfig.ACKS_CONFIG to "all",
- ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
- ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
- ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
- ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
- CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
- SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
- SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
- SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
- SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
- SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
- SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
- SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"sample-user\" " +
- "password=\"secretpassword\";"
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+ ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
)
val messageProducerProperties = bluePrintMessageLibPropertyService
- .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
val configProps = messageProducerProperties.getConfig()
- assertEquals(messageProducerProperties.topic,
- "default-topic",
- "Topic doesn't match the expected value"
+ assertEquals(
+ messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(
+ messageProducerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value"
)
- assertEquals(messageProducerProperties.type,
- "kafka-scram-ssl-auth",
- "Authentication type doesn't match the expected value")
expectedConfig.forEach {
- assertTrue(configProps.containsKey(it.key),
- "Missing expected kafka config key : ${it.key}"
+ assertTrue(
+ configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
)
- assertEquals(configProps[it.key],
- it.value,
- "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ assertEquals(
+ configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
)
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 44990ae7f..f488a4c74 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -41,31 +41,33 @@ import kotlin.test.assertNotNull
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+ classes = [
+ BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
+ ]
)
@TestPropertySource(
properties =
- [
- "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
+ [
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
- "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
- "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
- "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
- "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
- ]
+ ]
)
class KafkaStreamsConsumerServiceTest {
@@ -92,7 +94,7 @@ class KafkaStreamsConsumerServiceTest {
): Topology {
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
topology.addSource("Source", *topics.toTypedArray())
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
index 5d77c3746..aa38b6e5c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
@@ -31,6 +31,7 @@ import java.nio.charset.Charset
import java.util.UUID
class PriorityMessage : Serializable {
+
lateinit var id: String
lateinit var requestMessage: String
}