diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
3 files changed, 362 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 59e3606ea..005223d9b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -20,7 +20,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message import org.apache.kafka.streams.StreamsConfig /** Producer Properties **/ -open class MessageProducerProperties +open class MessageProducerProperties { + lateinit var type: String +} open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { lateinit var bootstrapServers: String @@ -35,7 +37,9 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() /** Consumer Properties **/ -open class MessageConsumerProperties +open class MessageConsumerProperties { + lateinit var type: String +} open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt new file mode 100644 index 000000000..c6e923948 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -0,0 +1,255 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.data.RelationshipType +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.PropertiesAssignmentBuilder +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.RelationshipTemplateBuilder +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.TopologyTemplateBuilder +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.relationshipType + +/** Relationships Types DSL for Message Producer */ +fun BluePrintTypes.relationshipTypeConnectsToMessageProducer(): RelationshipType { + return relationshipType( + id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, + version = BluePrintConstants.DEFAULT_VERSION_NUMBER, + derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, + description = "Relationship connects to through message producer." + ) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintConstants.DATA_TYPE_MAP, + true, + "Connection Config details." + ) + validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) + } +} + +fun BluePrintTypes.relationshipTypeConnectsToMessageConsumer(): RelationshipType { + return relationshipType( + id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, + version = BluePrintConstants.DEFAULT_VERSION_NUMBER, + derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, + description = "Relationship type connects to message consumer." + ) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintConstants.DATA_TYPE_MAP, + true, + "Connection Config details." + ) + validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) + } +} + +/** Relationships Templates DSL for Message Producer */ +fun TopologyTemplateBuilder.relationshipTemplateMessageProducer( + name: String, + description: String, + block: MessageProducerRelationshipTemplateBuilder.() -> Unit +) { + if (relationshipTemplates == null) relationshipTemplates = hashMapOf() + val relationshipTemplate = + MessageProducerRelationshipTemplateBuilder(name, description).apply(block).build() + relationshipTemplates!![relationshipTemplate.id!!] = relationshipTemplate +} + +class MessageProducerRelationshipTemplateBuilder(name: String, description: String) : + RelationshipTemplateBuilder( + name, + BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, description + ) { + + fun kafkaBasicAuth(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block) + ) + } +} + +fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaBasicAuthMessageProducerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() + +class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() { + + fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) + + fun bootstrapServers(bootstrapServers: JsonNode) = + property(KafkaBasicAuthMessageProducerProperties::bootstrapServers, bootstrapServers) + + fun topic(topic: String) = topic(topic.asJsonPrimitive()) + + fun topic(topic: JsonNode) = + property(KafkaBasicAuthMessageProducerProperties::topic, topic) + + fun clientId(clientId: String) = bootstrapServers(clientId.asJsonPrimitive()) + + fun clientId(clientId: JsonNode) = + property(KafkaBasicAuthMessageProducerProperties::clientId, clientId) + + fun acks(acks: String) = acks(acks.asJsonPrimitive()) + + fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks) + + fun retries(retries: Int) = retries(retries.asJsonPrimitive()) + + fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries) + + fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive()) + + fun enableIdempotence(enableIdempotence: JsonNode) = + property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence) +} + +/** Relationships Templates DSL for Message Consumer */ +fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer( + name: String, + description: String, + block: MessageConsumerRelationshipTemplateBuilder.() -> Unit +) { + if (relationshipTemplates == null) relationshipTemplates = hashMapOf() + val relationshipTemplate = + MessageConsumerRelationshipTemplateBuilder(name, description).apply(block).build() + relationshipTemplates!![relationshipTemplate.id!!] = relationshipTemplate +} + +class MessageConsumerRelationshipTemplateBuilder(name: String, description: String) : + RelationshipTemplateBuilder( + name, + BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, description + ) { + + fun kafkaBasicAuth(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block) + ) + } + + fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { + property( + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block) + ) + } +} + +fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaBasicAuthMessageConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { + val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build() + assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] = + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH.asJsonPrimitive() + return assignments.asJsonType() +} + +open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder() + +open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { + + fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) + + fun bootstrapServers(bootstrapServers: JsonNode) = + property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers) + + fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive()) + + fun groupId(groupId: JsonNode) = + property(KafkaMessageConsumerProperties::groupId, groupId) + + fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive()) + + fun clientId(clientId: JsonNode) = + property(KafkaMessageConsumerProperties::clientId, clientId) + + fun topic(topic: String) = topic(topic.asJsonPrimitive()) + + fun topic(topic: JsonNode) = + property(KafkaMessageConsumerProperties::topic, topic) + + fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive()) + + fun autoCommit(autoCommit: JsonNode) = + property(KafkaMessageConsumerProperties::autoCommit, autoCommit) + + fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) + + fun autoOffsetReset(autoOffsetReset: JsonNode) = + property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset) + + fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive()) + + fun pollMillSec(pollMillSec: JsonNode) = + property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec) + + fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive()) + + fun pollRecords(pollRecords: JsonNode) = + property(KafkaMessageConsumerProperties::pollRecords, pollRecords) +} + +/** KafkaBasicAuthMessageConsumerProperties assignment builder */ +class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder() + +/** KafkaStreamsConsumerProperties assignment builder */ +open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() { + + fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive()) + + fun bootstrapServers(bootstrapServers: JsonNode) = + property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers) + + fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive()) + + fun applicationId(applicationId: JsonNode) = + property(KafkaStreamsConsumerProperties::applicationId, applicationId) + + fun topic(topic: String) = topic(topic.asJsonPrimitive()) + + fun topic(topic: JsonNode) = + property(KafkaStreamsConsumerProperties::topic, topic) + + fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive()) + + fun autoOffsetReset(autoOffsetReset: JsonNode) = + property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset) + + fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive()) + + fun processingGuarantee(processingGuarantee: JsonNode) = + property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee) +} + +class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt new file mode 100644 index 000000000..9ece90ffc --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -0,0 +1,101 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.streams.StreamsConfig +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.relationshipTypeConnectsTo +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.serviceTemplate +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +class MessagePropertiesDSLTest { + + @Test + fun testMessageProducerDSL() { + val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { + topologyTemplate { + relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") { + kafkaBasicAuth { + bootstrapServers("sample-bootstrapServers") + clientId("sample-client-id") + acks("all") + retries(3) + enableIdempotence(true) + topic("sample-topic") + } + } + } + relationshipTypes( + arrayListOf( + BluePrintTypes.relationshipTypeConnectsToMessageProducer(), + BluePrintTypes.relationshipTypeConnectsTo() + ) + ) + } + assertNotNull(serviceTemplate, "failed to create service template") + val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates + assertNotNull(relationshipTemplates, "failed to get relationship templates") + assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match") + assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") + // println(serviceTemplate.asJsonString(true)) + } + + @Test + fun testMessageConsumerDSL() { + val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") { + topologyTemplate { + relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") { + kafkaBasicAuth { + bootstrapServers("sample-bootstrapServers") + clientId("sample-client-id") + groupId("sample-group-id") + topic("sample-topic") + autoCommit(false) + autoOffsetReset("latest") + pollMillSec(5000) + pollRecords(20) + } + } + relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") { + kafkaStreamsBasicAuth { + bootstrapServers("sample-bootstrapServers") + applicationId("sample-application-id") + autoOffsetReset("latest") + processingGuarantee(StreamsConfig.EXACTLY_ONCE) + topic("sample-streaming-topic") + } + } + } + relationshipTypes( + arrayListOf( + BluePrintTypes.relationshipTypeConnectsToMessageConsumer(), + BluePrintTypes.relationshipTypeConnectsTo() + ) + ) + } + + assertNotNull(serviceTemplate, "failed to create service template") + val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates + assertNotNull(relationshipTemplates, "failed to get relationship templates") + assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match") + assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth") + assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth") + // println(serviceTemplate.asJsonString(true)) + } +} |