diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
13 files changed, 276 insertions, 231 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index c6587c799..659295a6b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -57,6 +57,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep class MessageLibConstants { companion object { + const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index d76621c26..67dba1f19 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig /** Common Properties **/ abstract class CommonProperties { + lateinit var type: String lateinit var topic: String lateinit var bootstrapServers: String @@ -73,6 +74,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() /** SSL Auth */ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + lateinit var truststore: String lateinit var truststorePassword: String var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE @@ -100,6 +102,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer /** (SASL) SCRAM SSL Auth */ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" lateinit var scramUsername: String lateinit var scramPassword: String @@ -109,8 +112,8 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" return configProps } } @@ -122,6 +125,7 @@ abstract class MessageConsumerProperties : CommonProperties() /** Basic Auth */ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { + lateinit var applicationId: String var autoOffsetReset: String = "latest" var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE @@ -137,6 +141,7 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() /** SSL Auth */ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { + lateinit var truststore: String lateinit var truststorePassword: String var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE @@ -163,6 +168,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer /** (SASL) SCRAM SSL Auth */ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" lateinit var scramUsername: String lateinit var scramPassword: String @@ -172,8 +178,8 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" return configProps } } @@ -182,6 +188,7 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr /** Message Consumer Properties **/ /** Basic Auth */ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { + lateinit var groupId: String lateinit var clientId: String var autoCommit: Boolean = true @@ -213,6 +220,7 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() /** SSL Auth */ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + lateinit var truststore: String lateinit var truststorePassword: String var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE @@ -239,6 +247,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer /** (SASL) SCRAM SSL Auth */ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { + var saslMechanism: String = "SCRAM-SHA-512" lateinit var scramUsername: String lateinit var scramPassword: String @@ -248,8 +257,8 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" return configProps } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index e9bc5d8ad..8b31de9b9 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -102,15 +102,15 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) ) } fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) ) } } @@ -125,14 +125,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaSslAuthMessageProducerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaScramSslAuthMessageProducerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } @@ -174,58 +174,60 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro } open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) fun truststore(truststore: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::truststore, truststore) + property(KafkaSslAuthMessageProducerProperties::truststore, truststore) fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) fun truststorePassword(truststorePassword: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword) + property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword) fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) fun truststoreType(truststoreType: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType) + property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType) fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) fun keystore(keystore: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) fun keystorePassword(keystorePassword: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) fun keystoreType(keystoreType: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = - sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) + property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) } class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) fun saslMechanism(saslMechanism: JsonNode) = - property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism) + property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism) fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) fun scramUsername(scramUsername: JsonNode) = - property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername) + property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername) fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) fun scramPassword(scramPassword: JsonNode) = - property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword) + property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword) } /** Relationships Templates DSL for Message Consumer */ @@ -255,15 +257,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) ) } fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) ) } @@ -276,15 +278,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) ) } fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BluePrintConstants.PROPERTY_CONNECTION_CONFIG, - BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) ) } } @@ -299,14 +301,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaSslAuthMessageConsumerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } @@ -320,14 +322,14 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsSslAuthConsumerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] = - MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } @@ -378,58 +380,60 @@ open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageCon } open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) fun truststore(truststore: JsonNode) = - property(KafkaSslAuthMessageConsumerProperties::truststore, truststore) + property(KafkaSslAuthMessageConsumerProperties::truststore, truststore) fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) fun truststorePassword(truststorePassword: JsonNode) = - property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword) + property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword) fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) fun truststoreType(truststoreType: JsonNode) = - property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType) + property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType) fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) fun keystore(keystore: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) fun keystorePassword(keystorePassword: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) fun keystoreType(keystoreType: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = - sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = - property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) + property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) } class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) fun saslMechanism(saslMechanism: JsonNode) = - property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism) + property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism) fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) fun scramUsername(scramUsername: JsonNode) = - property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername) + property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername) fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) fun scramPassword(scramPassword: JsonNode) = - property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword) + property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword) } /** KafkaStreamsConsumerProperties assignment builder */ @@ -462,56 +466,58 @@ open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageCon } open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() { + fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive()) fun truststore(truststore: JsonNode) = - property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore) + property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore) fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive()) fun truststorePassword(truststorePassword: JsonNode) = - property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword) + property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword) fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive()) fun truststoreType(truststoreType: JsonNode) = - property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType) + property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType) fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive()) fun keystore(keystore: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystore, keystore) + property(KafkaSslAuthMessageProducerProperties::keystore, keystore) fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive()) fun keystorePassword(keystorePassword: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) + property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword) fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive()) fun keystoreType(keystoreType: JsonNode) = - property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) + property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) = - sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) + sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive()) fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) = - property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) + property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm) } class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() { + fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive()) fun saslMechanism(saslMechanism: JsonNode) = - property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism) + property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism) fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive()) fun scramUsername(scramUsername: JsonNode) = - property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername) + property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername) fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive()) fun scramPassword(scramPassword: JsonNode) = - property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword) + property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword) } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 67fbef580..456bfbdd9 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -59,12 +59,12 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer } MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaSslAuthMessageProducerProperties::class.java + prefix, KafkaSslAuthMessageProducerProperties::class.java ) } MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaScramSslAuthMessageProducerProperties::class.java + prefix, KafkaScramSslAuthMessageProducerProperties::class.java ) } else -> { @@ -113,33 +113,33 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer /** Message Consumer */ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java + prefix, KafkaBasicAuthMessageConsumerProperties::class.java ) } MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaSslAuthMessageConsumerProperties::class.java + prefix, KafkaSslAuthMessageConsumerProperties::class.java ) } MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaScramSslAuthMessageConsumerProperties::class.java + prefix, KafkaScramSslAuthMessageConsumerProperties::class.java ) } /** Stream Consumer */ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java ) } MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsSslAuthConsumerProperties::class.java + prefix, KafkaStreamsSslAuthConsumerProperties::class.java ) } MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java + prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java ) } else -> { @@ -178,44 +178,44 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer } private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): - BlueprintMessageConsumerService { + BlueprintMessageConsumerService { - when (messageConsumerProperties.type) { - /** Message Consumer */ - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - return KafkaMessageConsumerService( + when (messageConsumerProperties.type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + return KafkaMessageConsumerService( messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - return KafkaMessageConsumerService( + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + return KafkaMessageConsumerService( messageConsumerProperties as KafkaSslAuthMessageConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - return KafkaMessageConsumerService( + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + return KafkaMessageConsumerService( messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties - ) - } - /** Stream Consumer */ - MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - return KafkaStreamsConsumerService( + ) + } + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + return KafkaStreamsConsumerService( messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { - return KafkaStreamsConsumerService( + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + return KafkaStreamsConsumerService( messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { - return KafkaStreamsConsumerService( + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + return KafkaStreamsConsumerService( messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties - ) - } - else -> { - throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") + ) + } + else -> { + throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") + } } } - } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 311d35c38..b39d89bfd 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -74,6 +74,7 @@ interface KafkaConsumerRecordsFunction : ConsumerFunction { } interface KafkaStreamConsumerFunction : ConsumerFunction { + suspend fun createTopology( messageConsumerProperties: MessageConsumerProperties, additionalConfig: Map<String, Any>? diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index 66d3a5b73..36392cff3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -25,11 +25,17 @@ interface BlueprintMessageProducerService { sendMessageNB(key, message, headers) } - fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking { - sendMessageNB(key, topic, message, headers) - } + fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = + runBlocking { + sendMessageNB(key, topic, message, headers) + } suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean - suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean + suspend fun sendMessageNB( + key: String = UUID.randomUUID().toString(), + topic: String, + message: Any, + headers: MutableMap<String, String>? = null + ): Boolean } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 59e9192bb..eccf75301 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -44,6 +44,7 @@ class KafkaMessageProducerService( private val messageLoggerService = MessageLoggerService() companion object { + const val MAX_ERR_MSG_LEN = 128 } @@ -113,8 +114,8 @@ class KafkaMessageProducerService( /** Truncation of error messages */ var truncErrMsg = executionServiceOutput.status.errorMessage if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) { - truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" + - " [...]. Check Blueprint Processor logs for more information." + truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" + + " [...]. Check Blueprint Processor logs for more information." } /** Truncation for Command Executor responses */ var truncPayload = executionServiceOutput.payload.deepCopy() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt index 04b754b13..90b850017 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -61,7 +61,8 @@ class MessageLoggerService { val headers = consumerRecord.headers().toMap() val localhost = InetAddress.getLocalHost() MDC.put( - "InvokeTimestamp", ZonedDateTime + "InvokeTimestamp", + ZonedDateTime .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) .format(DateTimeFormatter.ISO_INSTANT) ) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index b1af230b9..b68678baf 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -67,12 +67,12 @@ class MessagePropertiesDSLTest { assertNotNull(relationshipTypes, "failed to get relationship types") assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match") assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}" ) assertNotNull( - relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], - "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" + relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER], + "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}" ) } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 77bdbe408..a69f9f51a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -59,36 +59,39 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] + classes = [ + BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class + ] ) @TestPropertySource( properties = - ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", - "blueprintsprocessor.messageconsumer.sample.topic=default-topic", - "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", - "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", - "blueprintsprocessor.messageconsumer.sample.pollRecords=1", - "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword", - "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword", - "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", - "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", + [ + "blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", - "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" - ] + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" + ] ) open class BlueprintMessageConsumerServiceTest { @@ -206,46 +209,52 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaScramSslAuthConfig() { val expectedConfig = mapOf<String, Any>( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", - ConsumerConfig.GROUP_ID_CONFIG to "sample-group", - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), - SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", - SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", - SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", - SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, - SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", - SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"sample-user\" " + - "password=\"secretpassword\";" - ) + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ConsumerConfig.GROUP_ID_CONFIG to "sample-group", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" + ) val messageConsumerProperties = bluePrintMessageLibPropertyService - .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample") + .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample") val configProps = messageConsumerProperties.getConfig() - assertEquals(messageConsumerProperties.topic, - "default-topic", - "Topic doesn't match the expected value" + assertEquals( + messageConsumerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals( + messageConsumerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value" ) - assertEquals(messageConsumerProperties.type, - "kafka-scram-ssl-auth", - "Authentication type doesn't match the expected value") expectedConfig.forEach { - assertTrue(configProps.containsKey(it.key), - "Missing expected kafka config key : ${it.key}") - assertEquals(configProps[it.key], - it.value, - "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + assertTrue( + configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" + ) + assertEquals( + configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" ) } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 881f0b422..f88caa173 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -49,22 +49,25 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] + classes = [ + BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class + ] ) @TestPropertySource( properties = - ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", - "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" - ] + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword" + ] ) open class BlueprintMessageProducerServiceTest { @@ -96,48 +99,53 @@ open class BlueprintMessageProducerServiceTest { @Test fun testKafkaScramSslAuthConfig() { val expectedConfig = mapOf<String, Any>( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, - ProducerConfig.ACKS_CONFIG to "all", - ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, - ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, - ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, - ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), - SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", - SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", - SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", - SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, - SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", - SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"sample-user\" " + - "password=\"secretpassword\";" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, + ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, + ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks", + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, + SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", + SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"sample-user\" " + + "password=\"secretpassword\";" ) val messageProducerProperties = bluePrintMessageLibPropertyService - .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample") + .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample") val configProps = messageProducerProperties.getConfig() - assertEquals(messageProducerProperties.topic, - "default-topic", - "Topic doesn't match the expected value" + assertEquals( + messageProducerProperties.topic, + "default-topic", + "Topic doesn't match the expected value" + ) + assertEquals( + messageProducerProperties.type, + "kafka-scram-ssl-auth", + "Authentication type doesn't match the expected value" ) - assertEquals(messageProducerProperties.type, - "kafka-scram-ssl-auth", - "Authentication type doesn't match the expected value") expectedConfig.forEach { - assertTrue(configProps.containsKey(it.key), - "Missing expected kafka config key : ${it.key}" + assertTrue( + configProps.containsKey(it.key), + "Missing expected kafka config key : ${it.key}" ) - assertEquals(configProps[it.key], - it.value, - "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" + assertEquals( + configProps[it.key], + it.value, + "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}" ) } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index 44990ae7f..f488a4c74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -41,31 +41,33 @@ import kotlin.test.assertNotNull @RunWith(SpringRunner::class) @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] + classes = [ + BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class + ] ) @TestPropertySource( properties = - [ - "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", - "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", - "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword", - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth", - "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword", - "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user", - "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword" + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword", + "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user", + "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword" - ] + ] ) class KafkaStreamsConsumerServiceTest { @@ -92,7 +94,7 @@ class KafkaStreamsConsumerServiceTest { ): Topology { val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") topology.addSource("Source", *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt index 5d77c3746..aa38b6e5c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt @@ -31,6 +31,7 @@ import java.nio.charset.Charset import java.util.UUID class PriorityMessage : Serializable { + lateinit var id: String lateinit var requestMessage: String } |