aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt255
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt101
3 files changed, 362 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 59e3606ea..005223d9b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -20,7 +20,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message
import org.apache.kafka.streams.StreamsConfig
/** Producer Properties **/
-open class MessageProducerProperties
+open class MessageProducerProperties {
+ lateinit var type: String
+}
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
lateinit var bootstrapServers: String
@@ -35,7 +37,9 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
/** Consumer Properties **/
-open class MessageConsumerProperties
+open class MessageConsumerProperties {
+ lateinit var type: String
+}
open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
new file mode 100644
index 000000000..c6e923948
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -0,0 +1,255 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.data.RelationshipType
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.PropertiesAssignmentBuilder
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.RelationshipTemplateBuilder
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.TopologyTemplateBuilder
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.relationshipType
+
+/** Relationships Types DSL for Message Producer */
+fun BluePrintTypes.relationshipTypeConnectsToMessageProducer(): RelationshipType {
+ return relationshipType(
+ id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER,
+ version = BluePrintConstants.DEFAULT_VERSION_NUMBER,
+ derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO,
+ description = "Relationship connects to through message producer."
+ ) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintConstants.DATA_TYPE_MAP,
+ true,
+ "Connection Config details."
+ )
+ validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT))
+ }
+}
+
+fun BluePrintTypes.relationshipTypeConnectsToMessageConsumer(): RelationshipType {
+ return relationshipType(
+ id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER,
+ version = BluePrintConstants.DEFAULT_VERSION_NUMBER,
+ derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO,
+ description = "Relationship type connects to message consumer."
+ ) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintConstants.DATA_TYPE_MAP,
+ true,
+ "Connection Config details."
+ )
+ validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT))
+ }
+}
+
+/** Relationships Templates DSL for Message Producer */
+fun TopologyTemplateBuilder.relationshipTemplateMessageProducer(
+ name: String,
+ description: String,
+ block: MessageProducerRelationshipTemplateBuilder.() -> Unit
+) {
+ if (relationshipTemplates == null) relationshipTemplates = hashMapOf()
+ val relationshipTemplate =
+ MessageProducerRelationshipTemplateBuilder(name, description).apply(block).build()
+ relationshipTemplates!![relationshipTemplate.id!!] = relationshipTemplate
+}
+
+class MessageProducerRelationshipTemplateBuilder(name: String, description: String) :
+ RelationshipTemplateBuilder(
+ name,
+ BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, description
+ ) {
+
+ fun kafkaBasicAuth(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block)
+ )
+ }
+}
+
+fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaBasicAuthMessageProducerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
+
+class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
+
+ fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
+
+ fun bootstrapServers(bootstrapServers: JsonNode) =
+ property(KafkaBasicAuthMessageProducerProperties::bootstrapServers, bootstrapServers)
+
+ fun topic(topic: String) = topic(topic.asJsonPrimitive())
+
+ fun topic(topic: JsonNode) =
+ property(KafkaBasicAuthMessageProducerProperties::topic, topic)
+
+ fun clientId(clientId: String) = bootstrapServers(clientId.asJsonPrimitive())
+
+ fun clientId(clientId: JsonNode) =
+ property(KafkaBasicAuthMessageProducerProperties::clientId, clientId)
+
+ fun acks(acks: String) = acks(acks.asJsonPrimitive())
+
+ fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks)
+
+ fun retries(retries: Int) = retries(retries.asJsonPrimitive())
+
+ fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries)
+
+ fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive())
+
+ fun enableIdempotence(enableIdempotence: JsonNode) =
+ property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence)
+}
+
+/** Relationships Templates DSL for Message Consumer */
+fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer(
+ name: String,
+ description: String,
+ block: MessageConsumerRelationshipTemplateBuilder.() -> Unit
+) {
+ if (relationshipTemplates == null) relationshipTemplates = hashMapOf()
+ val relationshipTemplate =
+ MessageConsumerRelationshipTemplateBuilder(name, description).apply(block).build()
+ relationshipTemplates!![relationshipTemplate.id!!] = relationshipTemplate
+}
+
+class MessageConsumerRelationshipTemplateBuilder(name: String, description: String) :
+ RelationshipTemplateBuilder(
+ name,
+ BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, description
+ ) {
+
+ fun kafkaBasicAuth(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block)
+ )
+ }
+
+ fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block)
+ )
+ }
+}
+
+fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaBasicAuthMessageConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
+
+open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+
+ fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
+
+ fun bootstrapServers(bootstrapServers: JsonNode) =
+ property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers)
+
+ fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive())
+
+ fun groupId(groupId: JsonNode) =
+ property(KafkaMessageConsumerProperties::groupId, groupId)
+
+ fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive())
+
+ fun clientId(clientId: JsonNode) =
+ property(KafkaMessageConsumerProperties::clientId, clientId)
+
+ fun topic(topic: String) = topic(topic.asJsonPrimitive())
+
+ fun topic(topic: JsonNode) =
+ property(KafkaMessageConsumerProperties::topic, topic)
+
+ fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive())
+
+ fun autoCommit(autoCommit: JsonNode) =
+ property(KafkaMessageConsumerProperties::autoCommit, autoCommit)
+
+ fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
+
+ fun autoOffsetReset(autoOffsetReset: JsonNode) =
+ property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
+
+ fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive())
+
+ fun pollMillSec(pollMillSec: JsonNode) =
+ property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec)
+
+ fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive())
+
+ fun pollRecords(pollRecords: JsonNode) =
+ property(KafkaMessageConsumerProperties::pollRecords, pollRecords)
+}
+
+/** KafkaBasicAuthMessageConsumerProperties assignment builder */
+class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder()
+
+/** KafkaStreamsConsumerProperties assignment builder */
+open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+
+ fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
+
+ fun bootstrapServers(bootstrapServers: JsonNode) =
+ property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers)
+
+ fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive())
+
+ fun applicationId(applicationId: JsonNode) =
+ property(KafkaStreamsConsumerProperties::applicationId, applicationId)
+
+ fun topic(topic: String) = topic(topic.asJsonPrimitive())
+
+ fun topic(topic: JsonNode) =
+ property(KafkaStreamsConsumerProperties::topic, topic)
+
+ fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
+
+ fun autoOffsetReset(autoOffsetReset: JsonNode) =
+ property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset)
+
+ fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive())
+
+ fun processingGuarantee(processingGuarantee: JsonNode) =
+ property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee)
+}
+
+class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
new file mode 100644
index 000000000..9ece90ffc
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.streams.StreamsConfig
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.relationshipTypeConnectsTo
+import org.onap.ccsdk.cds.controllerblueprints.core.dsl.serviceTemplate
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+class MessagePropertiesDSLTest {
+
+ @Test
+ fun testMessageProducerDSL() {
+ val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
+ topologyTemplate {
+ relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") {
+ kafkaBasicAuth {
+ bootstrapServers("sample-bootstrapServers")
+ clientId("sample-client-id")
+ acks("all")
+ retries(3)
+ enableIdempotence(true)
+ topic("sample-topic")
+ }
+ }
+ }
+ relationshipTypes(
+ arrayListOf(
+ BluePrintTypes.relationshipTypeConnectsToMessageProducer(),
+ BluePrintTypes.relationshipTypeConnectsTo()
+ )
+ )
+ }
+ assertNotNull(serviceTemplate, "failed to create service template")
+ val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
+ assertNotNull(relationshipTemplates, "failed to get relationship templates")
+ assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match")
+ assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+ // println(serviceTemplate.asJsonString(true))
+ }
+
+ @Test
+ fun testMessageConsumerDSL() {
+ val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
+ topologyTemplate {
+ relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") {
+ kafkaBasicAuth {
+ bootstrapServers("sample-bootstrapServers")
+ clientId("sample-client-id")
+ groupId("sample-group-id")
+ topic("sample-topic")
+ autoCommit(false)
+ autoOffsetReset("latest")
+ pollMillSec(5000)
+ pollRecords(20)
+ }
+ }
+ relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") {
+ kafkaStreamsBasicAuth {
+ bootstrapServers("sample-bootstrapServers")
+ applicationId("sample-application-id")
+ autoOffsetReset("latest")
+ processingGuarantee(StreamsConfig.EXACTLY_ONCE)
+ topic("sample-streaming-topic")
+ }
+ }
+ }
+ relationshipTypes(
+ arrayListOf(
+ BluePrintTypes.relationshipTypeConnectsToMessageConsumer(),
+ BluePrintTypes.relationshipTypeConnectsTo()
+ )
+ )
+ }
+
+ assertNotNull(serviceTemplate, "failed to create service template")
+ val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
+ assertNotNull(relationshipTemplates, "failed to get relationship templates")
+ assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match")
+ assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+ assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth")
+ // println(serviceTemplate.asJsonString(true))
+ }
+}