summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap
diff options
context:
space:
mode:
authorSingal, Kapil (ks220y) <ks220y@att.com>2020-09-22 12:16:46 -0400
committerSingal, Kapil (ks220y) <ks220y@att.com>2020-09-22 13:49:05 -0400
commit1072867dfac0df993cbd3e44bcc11a5cac7465fd (patch)
tree4a821659cf3b50cf946cbb535d528be8508e9fff /ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap
parentd97021cd756d63402545fdc2e14ac7611c3da118 (diff)
Enabling Code Formatter
Code Formatter was turned off due to java 11 migation Issue-ID: CCSDK-2852 Signed-off-by: Singal, Kapil (ks220y) <ks220y@att.com> Change-Id: I3d02ed3cc7a93d7551fe25356512cfe8db1517d8
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt108
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt78
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt14
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt3
8 files changed, 128 insertions, 103 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index c6587c799..659295a6b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -57,6 +57,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
class MessageLibConstants {
companion object {
+
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index d76621c26..67dba1f19 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig
/** Common Properties **/
abstract class CommonProperties {
+
lateinit var type: String
lateinit var topic: String
lateinit var bootstrapServers: String
@@ -73,6 +74,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
/** SSL Auth */
open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -100,6 +102,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -109,8 +112,8 @@ class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
@@ -122,6 +125,7 @@ abstract class MessageConsumerProperties : CommonProperties()
/** Basic Auth */
open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
+
lateinit var applicationId: String
var autoOffsetReset: String = "latest"
var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
@@ -137,6 +141,7 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties()
/** SSL Auth */
open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -163,6 +168,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
/** (SASL) SCRAM SSL Auth */
class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -172,8 +178,8 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
@@ -182,6 +188,7 @@ class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerPr
/** Message Consumer Properties **/
/** Basic Auth */
open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
+
lateinit var groupId: String
lateinit var clientId: String
var autoCommit: Boolean = true
@@ -213,6 +220,7 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties()
/** SSL Auth */
open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+
lateinit var truststore: String
lateinit var truststorePassword: String
var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
@@ -239,6 +247,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
/** (SASL) SCRAM SSL Auth */
class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+
var saslMechanism: String = "SCRAM-SHA-512"
lateinit var scramUsername: String
lateinit var scramPassword: String
@@ -248,8 +257,8 @@ class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerPr
configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
- "username=\"${scramUsername}\" " +
- "password=\"${scramPassword}\";"
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
return configProps
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index e9bc5d8ad..8b31de9b9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -102,15 +102,15 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
)
}
fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
)
}
}
@@ -125,14 +125,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth
fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaSslAuthMessageProducerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaScramSslAuthMessageProducerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -174,58 +174,60 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
}
open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
+ property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
+ property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
+ property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
+ property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
+ property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
+ property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
}
/** Relationships Templates DSL for Message Consumer */
@@ -255,15 +257,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
)
}
fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
)
}
@@ -276,15 +278,15 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
)
}
fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
- BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
- BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
)
}
}
@@ -299,14 +301,14 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth
fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaSslAuthMessageConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -320,14 +322,14 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa
fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsSslAuthConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] =
- MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
return assignments.asJsonType()
}
@@ -378,58 +380,60 @@ open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageCon
}
open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
+ property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
+ property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
+ property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
+ property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
+ property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
+ property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
}
/** KafkaStreamsConsumerProperties assignment builder */
@@ -462,56 +466,58 @@ open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageCon
}
open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() {
+
fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
fun truststore(truststore: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
+ property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
fun truststorePassword(truststorePassword: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
+ property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
fun truststoreType(truststoreType: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
+ property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
fun keystore(keystore: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
fun keystorePassword(keystorePassword: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
fun keystoreType(keystoreType: JsonNode) =
- property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
- sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
- property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+ property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
}
class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() {
+
fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
fun saslMechanism(saslMechanism: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
+ property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
fun scramUsername(scramUsername: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
fun scramPassword(scramPassword: JsonNode) =
- property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 67fbef580..456bfbdd9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -59,12 +59,12 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaSslAuthMessageProducerProperties::class.java
+ prefix, KafkaSslAuthMessageProducerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+ prefix, KafkaScramSslAuthMessageProducerProperties::class.java
)
}
else -> {
@@ -113,33 +113,33 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
/** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaSslAuthMessageConsumerProperties::class.java
+ prefix, KafkaSslAuthMessageConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+ prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
)
}
/** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+ prefix, KafkaStreamsSslAuthConsumerProperties::class.java
)
}
MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+ prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
)
}
else -> {
@@ -178,44 +178,44 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
- BlueprintMessageConsumerService {
+ BlueprintMessageConsumerService {
- when (messageConsumerProperties.type) {
- /** Message Consumer */
- MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- return KafkaMessageConsumerService(
+ when (messageConsumerProperties.type) {
+ /** Message Consumer */
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
- return KafkaMessageConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
- return KafkaMessageConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
- )
- }
- /** Stream Consumer */
- MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ /** Stream Consumer */
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
- )
- }
- MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
- return KafkaStreamsConsumerService(
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
- )
- }
- else -> {
- throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
+ )
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
+ }
}
}
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 311d35c38..b39d89bfd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -74,6 +74,7 @@ interface KafkaConsumerRecordsFunction : ConsumerFunction {
}
interface KafkaStreamConsumerFunction : ConsumerFunction {
+
suspend fun createTopology(
messageConsumerProperties: MessageConsumerProperties,
additionalConfig: Map<String, Any>?
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index 66d3a5b73..36392cff3 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -25,11 +25,17 @@ interface BlueprintMessageProducerService {
sendMessageNB(key, message, headers)
}
- fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
- sendMessageNB(key, topic, message, headers)
- }
+ fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean =
+ runBlocking {
+ sendMessageNB(key, topic, message, headers)
+ }
suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
- suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
+ suspend fun sendMessageNB(
+ key: String = UUID.randomUUID().toString(),
+ topic: String,
+ message: Any,
+ headers: MutableMap<String, String>? = null
+ ): Boolean
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 59e9192bb..eccf75301 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -44,6 +44,7 @@ class KafkaMessageProducerService(
private val messageLoggerService = MessageLoggerService()
companion object {
+
const val MAX_ERR_MSG_LEN = 128
}
@@ -113,8 +114,8 @@ class KafkaMessageProducerService(
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
- " [...]. Check Blueprint Processor logs for more information."
+ truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ " [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
var truncPayload = executionServiceOutput.payload.deepCopy()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
index 04b754b13..90b850017 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -61,7 +61,8 @@ class MessageLoggerService {
val headers = consumerRecord.headers().toMap()
val localhost = InetAddress.getLocalHost()
MDC.put(
- "InvokeTimestamp", ZonedDateTime
+ "InvokeTimestamp",
+ ZonedDateTime
.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_INSTANT)
)