diff options
35 files changed, 1326 insertions, 631 deletions
diff --git a/components/model-catalog/blueprint-model/uat-blueprints/echo/Tests/uat.yaml b/components/model-catalog/blueprint-model/uat-blueprints/echo/Tests/uat.yaml index 116230929..5036a6fa2 100644 --- a/components/model-catalog/blueprint-model/uat-blueprints/echo/Tests/uat.yaml +++ b/components/model-catalog/blueprint-model/uat-blueprints/echo/Tests/uat.yaml @@ -26,9 +26,3 @@ processes: message: success payload: echo-response: {} - stepData: - name: echo - properties: - resource-assignment-params: - echo: *message - status: success diff --git a/components/model-catalog/blueprint-model/uat-blueprints/pnf_config/Tests/uat.yaml b/components/model-catalog/blueprint-model/uat-blueprints/pnf_config/Tests/uat.yaml index 3a5903c49..fbdb2eea8 100644 --- a/components/model-catalog/blueprint-model/uat-blueprints/pnf_config/Tests/uat.yaml +++ b/components/model-catalog/blueprint-model/uat-blueprints/pnf_config/Tests/uat.yaml @@ -31,32 +31,6 @@ processes: message: success payload: config-assign-response: {} - stepData: - name: config-assign - properties: - resource-assignment-params: - config-assign: &assignPatch - ietf-restconf:yang-patch: - patch-id: patch-1 - edit: - - edit-id: edit1 - operation: merge - target: / - value: { netconflist: { netconf: [ { netconf-id: "10", netconf-param: "1000" }]}} - - edit-id: edit2 - operation: merge - target: / - value: { netconflist: { netconf: [ { netconf-id: "20", netconf-param: "2000" }]}} - - edit-id: edit3 - operation: merge - target: / - value: { netconflist: { netconf: [ { netconf-id: "30", netconf-param: "3000" }]}} - status: success - responseNormalizerSpec: - stepData: - properties: - resource-assignment-params: - config-assign: ?from-json(.stepData.properties.resource-assignment-params.config-assign) - name: config-deploy request: commonHeader: *commonHeader @@ -84,11 +58,6 @@ processes: errorMessage: null eventType: EVENT_COMPONENT_EXECUTED message: success - stepData: - name: config-deploy - properties: - response-data: "" - status: success external-services: - selector: sdncodl @@ -127,7 +96,22 @@ external-services: path: [*configUri, *configletResourcePath] headers: Content-Type: application/yang.patch+json - body: *assignPatch + body: + ietf-restconf:yang-patch: + patch-id: patch-1 + edit: + - edit-id: edit1 + operation: merge + target: / + value: { netconflist: { netconf: [ { netconf-id: "10", netconf-param: "1000" }]}} + - edit-id: edit2 + operation: merge + target: / + value: { netconflist: { netconf: [ { netconf-id: "20", netconf-param: "2000" }]}} + - edit-id: edit3 + operation: merge + target: / + value: { netconflist: { netconf: [ { netconf-id: "30", netconf-param: "3000" }]}} - request: method: DELETE path: *configUri diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties index 019c5aabc..2fd595102 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties @@ -99,10 +99,15 @@ blueprintsprocessor.cliExecutor.enabled=true blueprintprocessor.remoteScriptCommand.enabled=false # Kafka-message-lib Configurations -blueprintsprocessor.messageclient.self-service-api.topic=producer.t -blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth -blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092 -blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t -blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id -blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id -blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false +blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false +blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id +blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t +blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000 + +blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/main/resources/application.properties b/ms/blueprintsprocessor/application/src/main/resources/application.properties index 4a4f79459..75d9cbaad 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application.properties @@ -92,10 +92,15 @@ blueprintsprocessor.restclient.aai-data.username=aai@aai.onap.org blueprintsprocessor.restclient.aai-data.password=demo123456! # Kafka-message-lib Configuration -blueprintsprocessor.messageclient.self-service-api.topic=producer.t -blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth -blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092 -blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t -blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id -blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id -blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false +blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false +blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t +blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id +blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000 + +blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageproducer.self-service-api.topic=producer.t diff --git a/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutor.kt b/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutor.kt index eafcaf44b..180ad7b48 100644 --- a/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutor.kt +++ b/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutor.kt @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor +package org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots import com.github.fge.jsonpatch.diff.JsonDiff +import org.apache.logging.log4j.util.Strings import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db.ResourceConfigSnapshot import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db.ResourceConfigSnapshot.Status.RUNNING @@ -144,18 +145,20 @@ open class ComponentConfigSnapshotsExecutor(private val cfgSnapshotService: Reso */ private suspend fun compareConfigurationSnapshot(resourceId: String, resourceType: String, contentType : String) { + val cfgRunning = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, RUNNING) + val cfgCandidate = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, CANDIDATE) + + if (cfgRunning.isEmpty() || cfgCandidate.isEmpty()) { + setNodeOutputProperties(OUTPUT_STATUS_SUCCESS, Strings.EMPTY) + return + } + when (contentType.toUpperCase()) { DIFF_JSON -> { - val cfgRunning = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, RUNNING) - val cfgCandidate = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, CANDIDATE) - val patchNode = JsonDiff.asJson(cfgRunning.jsonAsJsonType(), cfgCandidate.jsonAsJsonType()) setNodeOutputProperties(OUTPUT_STATUS_SUCCESS, patchNode.toString()) } DIFF_XML -> { - val cfgRunning = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, RUNNING) - val cfgCandidate = cfgSnapshotService.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, CANDIDATE) - val myDiff = DiffBuilder .compare(Input.fromString(cfgRunning)) .withTest(Input.fromString(cfgCandidate)) diff --git a/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotService.kt b/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotService.kt index 50c90f332..5fcba5b0c 100644 --- a/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotService.kt +++ b/ms/blueprintsprocessor/functions/config-snapshots/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext +import org.apache.logging.log4j.util.Strings import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db.ResourceConfigSnapshot.Status.RUNNING import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.slf4j.LoggerFactory @@ -40,7 +41,7 @@ class ResourceConfigSnapshotService(private val repository: ResourceConfigSnapsh status : ResourceConfigSnapshot.Status = RUNNING): String = withContext(Dispatchers.IO) { repository.findByResourceIdAndResourceTypeAndStatus(resourceId, resourceType, status) - ?.config_snapshot ?: throw NoSuchElementException() + ?.config_snapshot ?: Strings.EMPTY } suspend fun write(snapshot: String, resId: String, resType: String, diff --git a/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutorTest.kt b/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutorTest.kt index 79dd93037..c212908b9 100644 --- a/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutorTest.kt +++ b/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/ComponentConfigSnapshotsExecutorTest.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor +package org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots import com.fasterxml.jackson.databind.JsonNode import kotlinx.coroutines.runBlocking @@ -27,11 +27,12 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor.ComponentConfigSnapshotsExecutor.Companion.DIFF_JSON -import org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor.ComponentConfigSnapshotsExecutor.Companion.DIFF_XML -import org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor.ComponentConfigSnapshotsExecutor.Companion.OPERATION_DIFF -import org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor.ComponentConfigSnapshotsExecutor.Companion.OPERATION_FETCH -import org.onap.ccsdk.cds.blueprintsprocessor.functions.ansible.executor.ComponentConfigSnapshotsExecutor.Companion.OPERATION_STORE +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor.Companion.DIFF_JSON +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor.Companion.DIFF_XML +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor.Companion.OPERATION_DIFF +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor.Companion.OPERATION_FETCH +import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.ComponentConfigSnapshotsExecutor.Companion.OPERATION_STORE import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db.ResourceConfigSnapshot import org.onap.ccsdk.cds.blueprintsprocessor.functions.config.snapshots.db.ResourceConfigSnapshotService import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException @@ -192,9 +193,7 @@ class ComponentConfigSnapshotsExecutorTest { return@runBlocking } - // then; we should get error and the PAYLOAD payload in our output properties - assertTrue( bluePrintRuntimeService.getBluePrintError().errors.size > 0 ) - assertEquals(ComponentConfigSnapshotsExecutor.OUTPUT_STATUS_ERROR.asJsonPrimitive(), + assertEquals(ComponentConfigSnapshotsExecutor.OUTPUT_STATUS_SUCCESS.asJsonPrimitive(), bluePrintRuntimeService.getNodeTemplateAttributeValue(nodeTemplateName, ComponentConfigSnapshotsExecutor.OUTPUT_STATUS)) } @@ -234,7 +233,10 @@ class ComponentConfigSnapshotsExecutorTest { runBlocking { // when; asking for unknown content type diff operation; should get an error response try { - prepareRequestProperties(OPERATION_DIFF, "asdasd", "PNF", "YANG") + val resId = "121111" + val resType = "PNF" + cfgSnapshotService.write("snapshotConfig", resId, resType, ResourceConfigSnapshot.Status.CANDIDATE) + prepareRequestProperties(OPERATION_DIFF, resId, resType, "YANG") cfgSnapshotComponent.processNB(executionRequest) @@ -245,7 +247,6 @@ class ComponentConfigSnapshotsExecutorTest { } // then; we should get error in our output properties - assertTrue( bluePrintRuntimeService.getBluePrintError().errors.size == 1 ) assertEquals(ComponentConfigSnapshotsExecutor.OUTPUT_STATUS_ERROR.asJsonPrimitive(), bluePrintRuntimeService.getNodeTemplateAttributeValue(nodeTemplateName, ComponentConfigSnapshotsExecutor.OUTPUT_STATUS)) diff --git a/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotServiceTest.kt b/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotServiceTest.kt index 2830cb547..3c989c154 100644 --- a/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotServiceTest.kt +++ b/ms/blueprintsprocessor/functions/config-snapshots/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/config/snapshots/db/ResourceConfigSnapshotServiceTest.kt @@ -48,18 +48,6 @@ class ResourceConfigSnapshotServiceTest { } } - @Test(expected = NoSuchElementException::class) - fun notFoundEntryReturnsExceptionTest() { - val tr = ResourceConfigSnapshot() - runBlocking { - every { - cfgRepository.findByResourceIdAndResourceTypeAndStatus(any(), any(), any()) - } returns tr - val snap = cfgService.findByResourceIdAndResourceTypeAndStatus("MISSING_ID", "UNKNOWN_TYPE") - assertTrue ( snap.isBlank(), "Not found but returned a non empty string" ) - } - } - @Test fun createNewResourceConfigSnapshotTest() { val tr = ResourceConfigSnapshot() diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionConstants.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionConstants.kt index 2a9218df3..769644288 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionConstants.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionConstants.kt @@ -29,5 +29,4 @@ object ResourceResolutionConstants { const val RESOURCE_RESOLUTION_INPUT_OCCURRENCE = "occurrence" const val RESOURCE_RESOLUTION_INPUT_RESOURCE_ID = "resource-id" const val RESOURCE_RESOLUTION_INPUT_RESOURCE_TYPE = "resource-type" - val DATA_DICTIONARY_SECRET_SOURCE_TYPES = arrayOf("vault-data") //Add more secret data dictionary source type here }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtils.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtils.kt index 01cfd723b..117df1e5b 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtils.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtils.kt @@ -1,6 +1,6 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. - * Modifications Copyright © 2019 IBM. + * Modifications Copyright (c) 2019 IBM, Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.uti import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.databind.node.TextNode import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceAssignmentRuntimeService @@ -194,85 +195,214 @@ class ResourceAssignmentUtils { @Throws(BluePrintProcessorException::class) fun parseResponseNode(responseNode: JsonNode, resourceAssignment: ResourceAssignment, raRuntimeService: ResourceAssignmentRuntimeService, outputKeyMapping: MutableMap<String, String>): JsonNode { + try { + if ((resourceAssignment.property?.type).isNullOrEmpty()) { + throw BluePrintProcessorException("Couldn't get data dictionary type for dictionary name (${resourceAssignment.name})") + } + val type = resourceAssignment.property!!.type + return when (type) { + in BluePrintTypes.validPrimitiveTypes() -> { + parseResponseNodeForPrimitiveTypes(responseNode, resourceAssignment, outputKeyMapping) + } + in BluePrintTypes.validCollectionTypes() -> { + // Array Types + parseResponseNodeForCollection(responseNode, resourceAssignment, raRuntimeService, outputKeyMapping) + } + else -> { + // Complex Types + parseResponseNodeForComplexType(responseNode, resourceAssignment, raRuntimeService, outputKeyMapping) + } + } + } catch (e: Exception) { + logger.error("Fail to parse response data, error message $e") + throw BluePrintProcessorException("${e.message}", e) + } + } + + private fun parseResponseNodeForPrimitiveTypes(responseNode: JsonNode, resourceAssignment: ResourceAssignment, + outputKeyMapping: MutableMap<String, String>): JsonNode { val dName = resourceAssignment.dictionaryName - val dSource = resourceAssignment.dictionarySource - val type = nullToEmpty(resourceAssignment.property?.type) - lateinit var entrySchemaType: String - when (type) { - in BluePrintTypes.validPrimitiveTypes() -> { - if (dSource !in ResourceResolutionConstants.DATA_DICTIONARY_SECRET_SOURCE_TYPES) - logger.info("For template key (${resourceAssignment.name}) setting value as ($responseNode)") - val result = if (responseNode is ArrayNode) - responseNode.get(0) - else - responseNode - return if (result.isComplexType()) { - check(result.has(outputKeyMapping[dName])) { - "Fail to find output key mapping ($dName) in result." + logger.info("For template key (${resourceAssignment.name}) setting value as ($responseNode)") + + var result: JsonNode? = responseNode + if (responseNode.isComplexType()) { + val key = outputKeyMapping.keys.firstOrNull() + var returnNode: JsonNode? = responseNode + if (responseNode is ArrayNode) { + val arrayNode = responseNode.toList() + val firstElement = if (key.isNullOrEmpty()) { + arrayNode.first() + } + else{ + arrayNode.firstOrNull { element -> + element.isComplexType() && element.has(outputKeyMapping[key]) + } + } + + if (firstElement.isNull() || (firstElement!!.isComplexType() && !firstElement!!.has(outputKeyMapping[key])) + || (!result!!.isComplexType() && result is NullNode)) { + if (key.isNullOrEmpty()) { + throw BluePrintProcessorException("Fail to find mapping in the responseNode.") + } + else { + throw BluePrintProcessorException("Fail to find response with output key mapping ($key) in result.") } - result[outputKeyMapping[dName]] - } else { - result } + returnNode = firstElement } - in BluePrintTypes.validCollectionTypes() -> { - // Array Types - entrySchemaType = checkNotEmpty(resourceAssignment.property?.entrySchema?.type) { - "Entry schema is not defined for dictionary ($dName) info" + result = if (returnNode!!.isComplexType()) { + returnNode[outputKeyMapping[key]] + } + else { + returnNode + } + } + return result!! + } + + private fun parseResponseNodeForCollection(responseNode: JsonNode, resourceAssignment: ResourceAssignment, + raRuntimeService: ResourceAssignmentRuntimeService, + outputKeyMapping: MutableMap<String, String>): JsonNode { + val dName = resourceAssignment.dictionaryName + if ((resourceAssignment.property?.entrySchema?.type).isNullOrEmpty()) { + throw BluePrintProcessorException("Couldn't get data type for dictionary type " + + "(${resourceAssignment.property!!.type}) and dictionary name ($dName)") + } + val entrySchemaType = resourceAssignment.property!!.entrySchema!!.type + + var arrayNode = JacksonUtils.objectMapper.createArrayNode() + + if (outputKeyMapping.isNotEmpty()) { + when (responseNode) { + is ArrayNode -> { + val responseArrayNode = responseNode.toList() + for (responseSingleJsonNode in responseArrayNode) { + val arrayChildNode = parseArrayNodeElementWithOutputKeyMapping(raRuntimeService, responseSingleJsonNode, + outputKeyMapping, entrySchemaType) + arrayNode.add(arrayChildNode) + } } - val arrayNode = JacksonUtils.objectMapper.createArrayNode() - lateinit var responseValueNode: JsonNode - lateinit var propertyType: String - outputKeyMapping.map { - val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + is ObjectNode -> { val responseArrayNode = responseNode.rootFieldsToMap() - outer@ for ((key, responseSingleJsonNode) in responseArrayNode) { - if (key == it.key) { - if (entrySchemaType in BluePrintTypes.validPrimitiveTypes()) { - responseValueNode = responseSingleJsonNode - propertyType = entrySchemaType - - } else { - responseValueNode = responseSingleJsonNode.get(it.key) - propertyType = getPropertyType(raRuntimeService, entrySchemaType, it.key) - } - if (resourceAssignment.dictionarySource !in ResourceResolutionConstants.DATA_DICTIONARY_SECRET_SOURCE_TYPES) - logger.info("For List Type Resource: key (${it.key}), value ($responseValueNode), " + - "type ({$propertyType})") - JacksonUtils.populateJsonNodeValues(it.value, - responseValueNode, propertyType, arrayChildNode) - arrayNode.add(arrayChildNode) - break@outer - } - } + val arrayNodeResult = parseObjectNodeWithOutputKeyMapping(responseArrayNode, outputKeyMapping, entrySchemaType) + arrayNode.addAll(arrayNodeResult) + } + else -> { + throw BluePrintProcessorException("Key-value response expected to match the responseNode.") } - if (resourceAssignment.dictionarySource !in ResourceResolutionConstants.DATA_DICTIONARY_SECRET_SOURCE_TYPES) - logger.info("For template key (${resourceAssignment.name}) setting value as ($arrayNode)") - - return arrayNode } - else -> { - // Complex Types - entrySchemaType = checkNotEmpty(resourceAssignment.property?.type) { - "Entry schema is not defined for dictionary ($dName) info" + } + else { + when (responseNode) { + is ArrayNode -> { + responseNode.forEach { elementNode -> + arrayNode.add(elementNode) + } } - val objectNode = JacksonUtils.objectMapper.createObjectNode() + is ObjectNode -> { + val responseArrayNode = responseNode.rootFieldsToMap() + for ((key, responseSingleJsonNode) in responseArrayNode) { + val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + JacksonUtils.populateJsonNodeValues(key, responseSingleJsonNode, entrySchemaType, arrayChildNode) + arrayNode.add(arrayChildNode) + } + } + else -> { + arrayNode.add(responseNode) + } + } + } + + logger.info("For template key (${resourceAssignment.name}) setting value as ($arrayNode)") + + return arrayNode + } + + private fun parseResponseNodeForComplexType(responseNode: JsonNode, resourceAssignment: ResourceAssignment, + raRuntimeService: ResourceAssignmentRuntimeService, + outputKeyMapping: MutableMap<String, String>): JsonNode { + val entrySchemaType = resourceAssignment.property!!.type + val dictionaryName = resourceAssignment.dictionaryName!! + + var result: ObjectNode + if (checkOutputKeyMappingInDataTypeProperties(entrySchemaType, outputKeyMapping, raRuntimeService)) + { + result = parseArrayNodeElementWithOutputKeyMapping(raRuntimeService, responseNode, outputKeyMapping, entrySchemaType) + } + else { + val childNode = JacksonUtils.objectMapper.createObjectNode() + if (outputKeyMapping.isNotEmpty()) { outputKeyMapping.map { - val responseKeyValue = responseNode.get(it.key) - val propertyTypeForDataType = ResourceAssignmentUtils - .getPropertyType(raRuntimeService, entrySchemaType, it.key) + val responseKeyValue = if (responseNode.has(it.key)) { + responseNode.get(it.key) + } + else { + NullNode.getInstance() + } - if (resourceAssignment.dictionarySource !in ResourceResolutionConstants.DATA_DICTIONARY_SECRET_SOURCE_TYPES) - logger.info("For List Type Resource: key (${it.key}), value ($responseKeyValue), type ({$propertyTypeForDataType})") - JacksonUtils.populateJsonNodeValues(it.value, responseKeyValue, propertyTypeForDataType, objectNode) + JacksonUtils.populateJsonNodeValues(it.value, + responseKeyValue, entrySchemaType, childNode) } + } + else { + JacksonUtils.populateJsonNodeValues(dictionaryName, responseNode, entrySchemaType, childNode) + } + result = childNode + } + return result + } + + private fun parseArrayNodeElementWithOutputKeyMapping(raRuntimeService: ResourceAssignmentRuntimeService, + responseSingleJsonNode: JsonNode, outputKeyMapping: + MutableMap<String, String>, entrySchemaType: String): ObjectNode { + val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + + outputKeyMapping.map { + val responseKeyValue = if (responseSingleJsonNode.has(it.key)) { + responseSingleJsonNode.get(it.key) + } + else { + NullNode.getInstance() + } + val propertyTypeForDataType = ResourceAssignmentUtils + .getPropertyType(raRuntimeService, entrySchemaType, it.key) + + logger.info("For List Type Resource: key (${it.key}), value ($responseKeyValue), " + + "type ({$propertyTypeForDataType})") - if (resourceAssignment.dictionarySource !in ResourceResolutionConstants.DATA_DICTIONARY_SECRET_SOURCE_TYPES) - logger.info("For template key (${resourceAssignment.name}) setting value as ($objectNode)") + JacksonUtils.populateJsonNodeValues(it.value, + responseKeyValue, propertyTypeForDataType, arrayChildNode) + } + + return arrayChildNode + } + + private fun parseObjectNodeWithOutputKeyMapping(responseArrayNode: MutableMap<String, JsonNode>, + outputKeyMapping: MutableMap<String, String>, + entrySchemaType: String): ArrayNode { + val arrayNode = JacksonUtils.objectMapper.createArrayNode() + outputKeyMapping.map { + val objectNode = JacksonUtils.objectMapper.createObjectNode() + val responseSingleJsonNode = responseArrayNode.filterKeys { key -> key == it.key }.entries.firstOrNull() - return objectNode + if (responseSingleJsonNode == null) { + JacksonUtils.populateJsonNodeValues(it.value, NullNode.getInstance(), entrySchemaType, objectNode) } + else + { + JacksonUtils.populateJsonNodeValues(it.value, responseSingleJsonNode.value, entrySchemaType, objectNode) + } + arrayNode.add(objectNode) } + + return arrayNode + } + + private fun checkOutputKeyMappingInDataTypeProperties(dataTypeName: String, outputKeyMapping: MutableMap<String, String>, + raRuntimeService: ResourceAssignmentRuntimeService): Boolean { + val dataTypeProps = raRuntimeService.bluePrintContext().dataTypeByName(dataTypeName)?.properties + val result = outputKeyMapping.filterKeys { !dataTypeProps!!.containsKey(it) }.keys.firstOrNull() + return result == null } } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtilsTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtilsTest.kt index 9b87c12eb..9365c3e34 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtilsTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/utils/ResourceAssignmentUtilsTest.kt @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (c) 2019 IBM, Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,16 +19,86 @@ * ============LICENSE_END========================================================= */ - package org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.utils +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.TextNode +import io.mockk.every +import io.mockk.spyk +import org.junit.Before import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceAssignmentRuntimeService +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.data.DataType +import org.onap.ccsdk.cds.controllerblueprints.core.data.EntrySchema import org.onap.ccsdk.cds.controllerblueprints.core.data.PropertyDefinition +import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment import kotlin.test.assertEquals +data class IpAddress(val port: String, val ip: String) +data class Host(val name: String, val ipAddress: IpAddress) +data class ExpectedResponseIp(val ip: String) +data class ExpectedResponsePort(val port: String) + class ResourceAssignmentUtilsTest { + private lateinit var resourceAssignmentRuntimeService: ResourceAssignmentRuntimeService + + @Before + fun setup() { + + val bluePrintContext = BluePrintMetadataUtils.getBluePrintContext( + "./../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") + + resourceAssignmentRuntimeService = spyk(ResourceAssignmentRuntimeService("1234", bluePrintContext)) + + val propertiesDefinition1 = PropertyDefinition().apply { + type = "string" + id = "port" + } + + val propertiesDefinition2 = PropertyDefinition().apply { + type = "string" + id = "ip" + } + + val propertiesDefinition3 = PropertyDefinition().apply { + type = "string" + id = "name" + } + + val propertiesDefinition4 = PropertyDefinition().apply { + type = "ip-address" + id = "ipAddress" + } + + var mapOfPropertiesIpAddress = mutableMapOf<String, PropertyDefinition>() + mapOfPropertiesIpAddress["port"] = propertiesDefinition1 + mapOfPropertiesIpAddress["ip"] = propertiesDefinition2 + + var mapOfPropertiesHost = mutableMapOf<String, PropertyDefinition>() + mapOfPropertiesHost["name"] = propertiesDefinition3 + mapOfPropertiesHost["ipAddress"] = propertiesDefinition4 + + val myDataTypeIpaddress = DataType().apply { + id = "ip-address" + properties = mapOfPropertiesIpAddress + } + + val myDataTypeHost = DataType().apply { + id = "host" + properties = mapOfPropertiesHost + } + + every { resourceAssignmentRuntimeService.bluePrintContext().dataTypeByName("ip-address") } returns myDataTypeIpaddress + + every { resourceAssignmentRuntimeService.bluePrintContext().dataTypeByName("host") } returns myDataTypeHost + + every { resourceAssignmentRuntimeService.setNodeTemplateAttributeValue(any(), any(), any()) } returns Unit + } @Test fun `generateResourceDataForAssignments - positive test`() { @@ -43,7 +114,6 @@ class ResourceAssignmentUtilsTest { //then the assignment should produce a valid result val expected = "{\n" + " \"pnf-id\" : \"valid_value\"\n" + "}" assertEquals(expected, outcome, "unexpected outcome generated") - } @Test @@ -76,4 +146,243 @@ class ResourceAssignmentUtilsTest { } return resourceAssignmentForTest } + + @Test + fun parseResponseNodeTestForPrimitivesTypes(){ + // Input values for primitive type + val keyValue = mutableMapOf<String, String>() + keyValue["value"]= "1.2.3.1" + val expectedPrimitiveType = TextNode("1.2.3.1") + + var outcome = prepareResponseNodeForTest("sample-value", "string", + "", "1.2.3.1".asJsonPrimitive()) + assertEquals(expectedPrimitiveType, outcome, "Unexpected outcome returned for primitive type of simple String") + outcome = prepareResponseNodeForTest("sample-key-value", "string", "", keyValue) + assertEquals(expectedPrimitiveType, outcome, "Unexpected outcome returned for primitive type of key-value String") + } + + @Test + fun parseResponseNodeTestForCollectionsOfString(){ + // Input values for collection type + val mapOfString = mutableMapOf<String, String>() + mapOfString["value1"] = "1.2.3.1" + mapOfString["port"] = "8888" + mapOfString["value2"] = "1.2.3.2" + val arrayOfKeyValue = arrayListOf(ExpectedResponseIp("1.2.3.1"), + ExpectedResponsePort( "8888"), ExpectedResponseIp("1.2.3.2")) + + val mutableMapKeyValue = mutableMapOf<String, String>() + mutableMapKeyValue["value1"] = "1.2.3.1" + mutableMapKeyValue["port"] = "8888" + + //List + val expectedListOfString = arrayOfKeyValue.asJsonType() + var outcome = prepareResponseNodeForTest("listOfString", "list", + "string", mapOfString.asJsonType()) + assertEquals(expectedListOfString, outcome, "unexpected outcome returned for list of String") + + //Map + val expectedMapOfString = mutableMapOf<String, JsonNode>() + expectedMapOfString["ip"] = "1.2.3.1".asJsonPrimitive() + expectedMapOfString["port"] = "8888".asJsonPrimitive() + + val arrayNode = JacksonUtils.objectMapper.createArrayNode() + expectedMapOfString.map { + val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + arrayChildNode.set(it.key, it.value) + arrayNode.add(arrayChildNode) + } + val arrayChildNode1 = JacksonUtils.objectMapper.createObjectNode() + arrayChildNode1.set("ip", NullNode.getInstance()) + arrayNode.add(arrayChildNode1) + outcome = prepareResponseNodeForTest("mapOfString", "map", "string", + mutableMapKeyValue.asJsonType()) + assertEquals(arrayNode, outcome, "unexpected outcome returned for map of String") + } + + @Test + fun parseResponseNodeTestForCollectionsOfJsonNode(){ + // Input values for collection type + val mapOfString = mutableMapOf<String, JsonNode>() + mapOfString["value1"] = "1.2.3.1".asJsonPrimitive() + mapOfString["port"] = "8888".asJsonPrimitive() + mapOfString["value2"] = "1.2.3.2".asJsonPrimitive() + val arrayOfKeyValue = arrayListOf(ExpectedResponseIp("1.2.3.1"), + ExpectedResponsePort( "8888"), ExpectedResponseIp("1.2.3.2")) + + val mutableMapKeyValue = mutableMapOf<String, JsonNode>() + mutableMapKeyValue["value1"] = "1.2.3.1".asJsonPrimitive() + mutableMapKeyValue["port"] = "8888".asJsonPrimitive() + + //List + val expectedListOfString = arrayOfKeyValue.asJsonType() + var outcome = prepareResponseNodeForTest("listOfString", "list", + "string", mapOfString.asJsonType()) + assertEquals(expectedListOfString, outcome, "unexpected outcome returned for list of String") + + //Map + val expectedMapOfString = mutableMapOf<String, JsonNode>() + expectedMapOfString["ip"] = "1.2.3.1".asJsonPrimitive() + expectedMapOfString["port"] = "8888".asJsonPrimitive() + val arrayNode = JacksonUtils.objectMapper.createArrayNode() + expectedMapOfString.map { + val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + arrayChildNode.set(it.key, it.value) + arrayNode.add(arrayChildNode) + } + val arrayChildNode1 = JacksonUtils.objectMapper.createObjectNode() + arrayChildNode1.set("ip", NullNode.getInstance()) + arrayNode.add(arrayChildNode1) + outcome = prepareResponseNodeForTest("mapOfString", "map", + "string", mutableMapKeyValue.asJsonType()) + assertEquals(arrayNode, outcome, "unexpected outcome returned for map of String") + } + + @Test + fun parseResponseNodeTestForCollectionsOfComplexType(){ + // Input values for collection type + val mapOfComplexType = mutableMapOf<String, JsonNode>() + mapOfComplexType["value1"] = IpAddress("1111", "1.2.3.1").asJsonType() + mapOfComplexType["value2"] = IpAddress("2222", "1.2.3.2").asJsonType() + mapOfComplexType["value3"] = IpAddress("3333", "1.2.3.3").asJsonType() + + //List + val arrayNode = JacksonUtils.objectMapper.createArrayNode() + mapOfComplexType.map { + val arrayChildNode = JacksonUtils.objectMapper.createObjectNode() + arrayChildNode.set("ipAddress", it.value) + arrayNode.add(arrayChildNode) + } + var outcome = prepareResponseNodeForTest("listOfMyDataType", "list", + "ip-address", mapOfComplexType.asJsonType()) + assertEquals(arrayNode, outcome, "unexpected outcome returned for list of String") + } + + @Test + fun `parseResponseNodeTestForComplexType find one output key mapping`(){ + // Input values for complex type + val objectNode = JacksonUtils.objectMapper.createObjectNode() + + // Input values for collection type + val mapOfComplexType = mutableMapOf<String, JsonNode>() + mapOfComplexType["value"] = Host("my-ipAddress", IpAddress("1111", "1.2.3.1")).asJsonType() + mapOfComplexType["port"] = "8888".asJsonType() + mapOfComplexType["something"] = "1.2.3.2".asJsonType() + + val expectedComplexType = objectNode.set("ipAddress", Host("my-ipAddress", IpAddress("1111", "1.2.3.1")).asJsonType()) + val outcome = prepareResponseNodeForTest("complexTypeOneKeys", "host", + "", mapOfComplexType.asJsonType()) + assertEquals(expectedComplexType, outcome, "Unexpected outcome returned for complex type") + } + + @Test + fun `parseResponseNodeTestForComplexType find all output key mapping`(){ + // Input values for complex type + val objectNode = JacksonUtils.objectMapper.createObjectNode() + + // Input values for collection type + val mapOfComplexType = mutableMapOf<String, JsonNode>() + mapOfComplexType["name"] = "my-ipAddress".asJsonType() + mapOfComplexType["ipAddress"] = IpAddress("1111", "1.2.3.1").asJsonType() + + val expectedComplexType = Host("my-ipAddress", IpAddress("1111", "1.2.3.1")).asJsonType() + val outcome = prepareResponseNodeForTest("complexTypeAllKeys", "host", + "", mapOfComplexType.asJsonType()) + assertEquals(expectedComplexType, outcome, "Unexpected outcome returned for complex type") + } + + private fun prepareResponseNodeForTest(dictionary_source: String, sourceType: String, entrySchema: String, + response: Any): JsonNode { + + val resourceAssignment = when (sourceType) { + "list", "map" -> { + prepareRADataDictionaryCollection(dictionary_source, sourceType, entrySchema) + } + "string" -> { + prepareRADataDictionaryOfPrimaryType(dictionary_source) + } + else -> { + prepareRADataDictionaryComplexType(dictionary_source, sourceType, entrySchema) + } + } + + val responseNode = checkNotNull(JacksonUtils.getJsonNode(response)) { + "Failed to get database query result into Json node." + } + + val outputKeyMapping = prepareOutputKeyMapping(dictionary_source) + + return ResourceAssignmentUtils.parseResponseNode(responseNode, resourceAssignment, resourceAssignmentRuntimeService, outputKeyMapping) + } + + private fun prepareRADataDictionaryOfPrimaryType(dictionary_source: String): ResourceAssignment { + return ResourceAssignment().apply { + name = "ipAddress" + dictionaryName = "sample-ip" + dictionarySource = "$dictionary_source" + property = PropertyDefinition().apply { + type = "string" + } + } + } + + private fun prepareRADataDictionaryCollection(dictionary_source: String, sourceType: String, schema: String): ResourceAssignment { + return ResourceAssignment().apply { + name = "ipAddress-list" + dictionaryName = "sample-licenses" + dictionarySource = "$dictionary_source" + property = PropertyDefinition().apply { + type = "$sourceType" + entrySchema = EntrySchema().apply { + type = "$schema" + } + } + } + } + + private fun prepareRADataDictionaryComplexType(dictionary_source: String, sourceType: String, schema: String): ResourceAssignment { + return ResourceAssignment().apply { + name = "ipAddress-complexType" + dictionaryName = "sample-licenses" + dictionarySource = "$dictionary_source" + property = PropertyDefinition().apply { + type = "$sourceType" + } + } + } + + private fun prepareOutputKeyMapping(dictionary_source: String): MutableMap<String, String> { + val outputMapping = mutableMapOf<String, String>() + + when (dictionary_source) { + "listOfString", "mapOfString" -> { + //List of string + outputMapping["value1"] = "ip" + outputMapping["port"] = "port" + outputMapping["value2"] = "ip" + } + "listOfMyDataType", "mapOfMyDataType" -> { + //List or map of complex Type + outputMapping["value1"] = "ipAddress" + outputMapping["value2"] = "ipAddress" + outputMapping["value3"] = "ipAddress" + } + "sample-key-value", "sample-value" -> { + //Primary Type + if (dictionary_source=="sample-key-value") + outputMapping["sample-ip"] = "value" + } + else -> { + //Complex Type + if (dictionary_source == "complexTypeOneKeys") + outputMapping["value"] = "ipAddress" + else { + outputMapping["name"] = "name" + outputMapping["ipAddress"] = "ipAddress" + } + + } + } + return outputMapping + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 644c51860..27a444bdc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -17,6 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration @@ -26,10 +31,36 @@ import org.springframework.context.annotation.Configuration @EnableConfigurationProperties open class BluePrintMessageLibConfiguration +/** + * Exposed Dependency Service by this Message Lib Module + */ +fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService = + instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) + +/** Extension functions for message producer service **/ +fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(selector) +} + + +fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(jsonNode) +} + +/** Extension functions for message consumer service **/ +fun BluePrintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(selector) +} + +fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(jsonNode) +} + class MessageLibConstants { companion object { const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" - const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient." + const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." + const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index e621ec66f..ab04054fe 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -16,7 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message - +/** Producer Properties **/ open class MessageProducerProperties @@ -24,4 +24,18 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() lateinit var bootstrapServers: String var topic: String? = null var clientId: String? = null -}
\ No newline at end of file +} + +/** Consumer Properties **/ + +open class MessageConsumerProperties + +open class KafkaMessageConsumerProperties : MessageConsumerProperties() { + lateinit var bootstrapServers: String + lateinit var groupId: String + var clientId: String? = null + var topic: String? = null + var pollMillSec: Long = 1000 +} + +open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index fb01ce179..7c56ea432 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -18,9 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.* import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @@ -28,22 +26,22 @@ import org.springframework.stereotype.Service @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) { - fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService { - val messageClientProperties = messageClientProperties(jsonNode) - return blueprintMessageClientService(messageClientProperties) + fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { + val messageClientProperties = messageProducerProperties(jsonNode) + return blueprintMessageProducerService(messageClientProperties) } - fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService { - val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector" - val messageClientProperties = messageClientProperties(prefix) - return blueprintMessageClientService(messageClientProperties) + fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" + val messageClientProperties = messageProducerProperties(prefix) + return blueprintMessageProducerService(messageClientProperties) } - fun messageClientProperties(prefix: String): MessageProducerProperties { + fun messageProducerProperties(prefix: String): MessageProducerProperties { val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java) return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageClientProperties(prefix) + kafkaBasicAuthMessageProducerProperties(prefix) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -51,7 +49,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties { + fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties { val type = jsonNode.get("type").textValue() return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { @@ -63,7 +61,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties) + private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties) : BlueprintMessageProducerService { when (MessageProducerProperties) { @@ -76,9 +74,67 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { + private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { return bluePrintProperties.propertyBeanType( prefix, KafkaBasicAuthMessageProducerProperties::class.java) } + /** Consumer Property Lib Service Implementation **/ + + /** Return Message Consumer Service for [jsonNode] definitions. */ + fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + val messageConsumerProperties = messageConsumerProperties(jsonNode) + return blueprintMessageConsumerService(messageConsumerProperties) + } + + /** Return Message Consumer Service for [selector] definitions. */ + fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector" + val messageClientProperties = messageConsumerProperties(prefix) + return blueprintMessageConsumerService(messageClientProperties) + } + + /** Return Message Consumer Properties for [prefix] definitions. */ + fun messageConsumerProperties(prefix: String): MessageConsumerProperties { + val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java) + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + kafkaBasicAuthMessageConsumerProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { + val type = jsonNode.get("type").textValue() + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties) + : BlueprintMessageConsumerService { + + when (messageConsumerProperties) { + is KafkaBasicAuthMessageConsumerProperties -> { + return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) + } + else -> { + throw BluePrintProcessorException("couldn't get Message client service for") + } + } + } + + private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { + return bluePrintProperties.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java) + } + } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt new file mode 100644 index 000000000..25f0bf44d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel + +interface BlueprintMessageConsumerService { + + /** Subscribe to the Kafka channel with [additionalConfig] */ + suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> + + /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ + suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + + /** close the channel, consumer and other resources */ + suspend fun shutDown() + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt new file mode 100644 index 000000000..5a9e61bfd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -0,0 +1,116 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.time.Duration +import kotlin.concurrent.thread + +class KafkaBasicAuthMessageConsumerService( + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) + : BlueprintMessageConsumerService { + + private val channel = Channel<String>() + private var kafkaConsumer: Consumer<String, String>? = null + val log = logger(KafkaBasicAuthMessageConsumerService::class) + + @Volatile + var keepGoing = true + + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + val configProperties = hashMapOf<String, Any>() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + if (messageConsumerProperties.clientId != null) { + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! + } + // TODO("Security Implementation based on type") + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return KafkaConsumer(configProperties) + } + + override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return subscribe(consumerTopic, additionalConfig) + } + + + override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(consumerTopic) + log.info("Successfully consumed topic($consumerTopic)") + + thread(start = true, name = "KafkaConsumer") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + runBlocking { + consumerRecords?.forEach { consumerRecord -> + /** execute the command block */ + consumerRecord.value()?.let { + launch { + if (!channel.isClosedForSend) { + channel.send(it) + } else { + log.error("Channel is closed to receive message") + } + } + } + } + } + } + log.info("message listener shutting down.....") + } + } + return channel + } + + override suspend fun shutDown() { + /** stop the polling loop */ + keepGoing = false + /** Close the Channel */ + channel.cancel() + /** TO shutdown gracefully, need to wait for the maximum poll time */ + delay(messageConsumerProperties.pollMillSec) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 008e92437..1c93bb0fc 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -83,7 +83,6 @@ class KafkaBasicAuthMessageProducerService( } fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - log.info("Prepering templates") if (kafkaTemplate == null) { kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt new file mode 100644 index 000000000..2b84eaa78 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -0,0 +1,140 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.spyk +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.MockConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.common.TopicPartition +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class]) +@TestPropertySource(properties = +["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" +]) +open class BlueprintMessageConsumerServiceTest { + val log = logger(BlueprintMessageConsumerServiceTest::class) + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testKafkaBasicAuthConsumerService() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList<TopicPartition> = arrayListOf() + val topicsCollection: MutableList<String> = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf() + val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", + "I am message $i") + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + val channel = spyBlueprintMessageConsumerService.subscribe(null) + launch { + channel.consumeEach { + assertTrue(it.startsWith("I am message"), "failed to get the actual message") + } + } + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testKafkaIntegration() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val channel = blueprintMessageConsumerService.subscribe(null) + launch { + channel.consumeEach { + log.info("Consumed Message : $it") + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + launch { + repeat(5) { + delay(1000) + blueprintMessageProducerService.sendMessage("this is my message($it)") + } + } + delay(10000) + blueprintMessageConsumerService.shutDown() + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 0f8367d7e..31bcc1517 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -41,10 +41,10 @@ import kotlin.test.assertTrue @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = -["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092", - "blueprintsprocessor.messageclient.sample.topic=default-topic", - "blueprintsprocessor.messageclient.sample.clientId=default-client-id" +["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) open class BlueprintMessageProducerServiceTest { @@ -52,10 +52,10 @@ open class BlueprintMessageProducerServiceTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Test - fun testKafkaBasicAuthClientService() { + fun testKafkaBasicAuthProducertService() { runBlocking { - val bluePrintMessageClientService = bluePrintMessageLibPropertyService - .blueprintMessageClientService("sample") as KafkaBasicAuthMessageProducerService + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() @@ -64,11 +64,11 @@ open class BlueprintMessageProducerServiceTest { every { mockKafkaTemplate.send(any(), any()) } returns future - val spyBluePrintMessageClientService = spyk(bluePrintMessageClientService, recordPrivateCalls = true) + val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) - every { spyBluePrintMessageClientService.messageTemplate(any()) } returns mockKafkaTemplate + every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate - val response = spyBluePrintMessageClientService.sendMessage("Testing message") + val response = spyBluePrintMessageProducerService.sendMessage("Testing message") assertTrue(response, "failed to get command response") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 626b8f911..3868440c7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt index c45ebc127..5a6ba0661 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.api.data import com.fasterxml.jackson.annotation.JsonFormat +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode import io.swagger.annotations.ApiModelProperty @@ -40,6 +41,7 @@ open class ExecutionServiceInput { " and the input for resource resolution located within the xxx-request block, contained within xxx-properties") lateinit var payload: ObjectNode @get:ApiModelProperty(hidden = true) + @get:JsonIgnore var stepData: StepData? = null } @@ -56,6 +58,7 @@ open class ExecutionServiceOutput { " and the input for resource resolution located within the xxx-request block, contained within xxx-properties") lateinit var payload: ObjectNode @get:ApiModelProperty(hidden = true) + @get:JsonIgnore var stepData: StepData? = null } diff --git a/ms/blueprintsprocessor/modules/inbounds/configs-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/configs/api/ResourceConfigSnapshotControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/configs-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/configs/api/ResourceConfigSnapshotControllerTest.kt index c3f18fcba..b4c1ad0e0 100644 --- a/ms/blueprintsprocessor/modules/inbounds/configs-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/configs/api/ResourceConfigSnapshotControllerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/configs-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/configs/api/ResourceConfigSnapshotControllerTest.kt @@ -129,14 +129,15 @@ class ResourceConfigSnapshotControllerTest { } @Test - fun `get returns 404 if entry not found`() { + fun `get returns 200 if entry not found`() { runBlocking { webTestClient .get() .uri("/api/v1/configs?resourceId=MISSING&resourceType=PNF") .exchange() - .expectStatus().isNotFound + .expectStatus().is2xxSuccessful + .expectBody() } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt new file mode 100644 index 000000000..b339903c5 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.event.ApplicationReadyEvent +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import javax.annotation.PreDestroy + +@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], + havingValue = "true") +@Service +open class BluePrintProcessingKafkaConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler) { + + val log = logger(BluePrintProcessingKafkaConsumer::class) + + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService + + companion object { + const val CONSUMER_SELECTOR = "self-service-api" + const val PRODUCER_SELECTOR = "self-service-api" + } + + @EventListener(ApplicationReadyEvent::class) + fun setupMessageListener() = runBlocking { + try { + log.info("Setting up message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR)...") + + /** Get the Message Consumer Service **/ + blueprintMessageConsumerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(CONSUMER_SELECTOR) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create consumer service ${e.message}") + } + + /** Get the Message Producer Service **/ + val blueprintMessageProducerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(PRODUCER_SELECTOR) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + + launch { + /** Subscribe to the consumer topics */ + val additionalConfig: MutableMap<String, Any> = hashMapOf() + val channel = blueprintMessageConsumerService.subscribe(additionalConfig) + channel.consumeEach { message -> + launch { + try { + log.trace("Consumed Message : $message") + val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + //TODO("In future, Message publisher configuration vary with respect to request") + /** Send the response message */ + blueprintMessageProducerService.sendMessage(executionServiceOutput) + } catch (e: Exception) { + log.error("failed in processing the consumed message : $message", e) + } + } + } + } + } catch (e: Exception) { + log.error("failed to start message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR) ", e) + } + } + + @PreDestroy + fun shutdownMessageListener() = runBlocking { + try { + log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR)...") + blueprintMessageConsumerService.shutDown() + } catch (e: Exception) { + log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) + } + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt deleted file mode 100644 index 17e157d15..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt +++ /dev/null @@ -1,58 +0,0 @@ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api - -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.ConsumerFactory -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 -import org.springframework.kafka.support.serializer.JsonDeserializer - -@Configuration -open class MessagingConfig { - - @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") - lateinit var groupId: String - - @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") - lateinit var bootstrapServers: String - - open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? { - val configProperties = hashMapOf<String, Any>() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java - configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name - - val deserializer = JsonDeserializer<ExecutionServiceInput>() - deserializer.setRemoveTypeHeaders(true) - deserializer.addTrustedPackages("*") - - val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java, - ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)) - - return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), - ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer)) - } - - /** - * Creation of a Kafka MessageListener Container - * - * @return KafkaListener instance. - */ - @Bean - open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { - val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() - factory.consumerFactory = consumerFactory() - return factory - } -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt deleted file mode 100644 index 54cc0c129..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright © 2019 Bell Canada - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api - -import kotlinx.coroutines.async -import kotlinx.coroutines.runBlocking -import org.apache.commons.lang3.builder.ToStringBuilder -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.slf4j.LoggerFactory -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty -import org.springframework.kafka.annotation.KafkaListener -import org.springframework.stereotype.Service - -@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true") -@Service -open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService, - private val executionServiceHandler: ExecutionServiceHandler) { - - private val log = LoggerFactory.getLogger(MessagingController::class.java)!! - - companion object { - // TODO PREFIX should be retrieved from model or from request. - const val PREFIX = "self-service-api" - const val EXECUTION_STATUS = 200 - } - - @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) - open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) { - - runBlocking { - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value())) - - // Process the message. - async { - processMessage(record.value()) - }.await() - } - } - - private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) { - - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - - if (executionServiceOutput.status.code == EXECUTION_STATUS) { - val bluePrintMessageClientService = propertyService - .blueprintMessageClientService(PREFIX) - - val payload = executionServiceOutput.payload - - log.info("The payload to publish is {}", payload) - - bluePrintMessageClientService.sendMessage(payload) - } - else { - log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage) - } - } -} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt index ea05e88c0..9629aa4b5 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt @@ -23,8 +23,6 @@ import io.grpc.testing.GrpcServerRule import org.junit.Rule import org.junit.Test import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir @@ -33,7 +31,6 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.* import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -45,9 +42,7 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @EnableAutoConfiguration @DirtiesContext -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], - excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class, - MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintManagementGRPCHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt index ce5acd400..8bedc9628 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt @@ -1,6 +1,7 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. * Modifications Copyright © 2019 Bell Canada. + * Modifications Copyright © 2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.FilterType import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner @@ -44,8 +44,8 @@ import kotlin.test.BeforeTest @RunWith(SpringRunner::class) @DirtiesContext @EnableAutoConfiguration -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], - excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", + "org.onap.ccsdk.cds.controllerblueprints"]) @TestPropertySource(locations = ["classpath:application-test.properties"]) class BluePrintProcessingGRPCHandlerTest { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt new file mode 100644 index 000000000..7d43f533f --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import io.mockk.coEvery +import io.mockk.mockk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class]) +@TestPropertySource(locations = ["classpath:application-test.properties"]) +class BluePrintProcessingKafkaConsumerTest { + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testExecutionInputMessageConsumer() { + runBlocking { + assertNotNull(bluePrintMessageLibPropertyService, + "failed to initialise bluePrintMessageLibPropertyService") + + val executionServiceHandle = mockk<ExecutionServiceHandler>() + + coEvery { executionServiceHandle.doProcess(any()) } returns mockk() + + val bluePrintProcessingKafkaConsumer = BluePrintProcessingKafkaConsumer(bluePrintMessageLibPropertyService, + executionServiceHandle) + + launch { + bluePrintProcessingKafkaConsumer.setupMessageListener() + } + delay(100) + bluePrintProcessingKafkaConsumer.shutdownMessageListener() + } + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt new file mode 100644 index 000000000..fc6c4890c --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright © 2019 Bell Canada + * Modifications Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.autoconfigure.security.SecurityProperties +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest +import org.springframework.context.annotation.ComponentScan +import org.springframework.core.io.ByteArrayResource +import org.springframework.http.client.MultipartBodyBuilder +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import org.springframework.test.web.reactive.server.WebTestClient +import org.springframework.test.web.reactive.server.returnResult +import org.springframework.web.reactive.function.BodyInserters +import java.io.File +import java.nio.file.Files +import java.nio.file.Paths +import kotlin.test.Test + +@RunWith(SpringRunner::class) +@EnableAutoConfiguration +@ContextConfiguration(classes = [ExecutionServiceControllerTest::class, SecurityProperties::class]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) +@TestPropertySource(locations = ["classpath:application-test.properties"]) +@DirtiesContext +@WebFluxTest +class ExecutionServiceControllerTest { + + private val log = LoggerFactory.getLogger(ExecutionServiceControllerTest::class.java)!! + + @Autowired + lateinit var webTestClient: WebTestClient + + var event: ExecutionServiceInput? = null + + @Before + fun setup() { + deleteDir("target", "blueprints") + } + + @After + fun clean() { + deleteDir("target", "blueprints") + } + + @Test + fun uploadBluePrint() { + runBlocking { + val body = MultipartBodyBuilder().apply { + part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) { + override fun getFilename(): String { + return "test-cba.zip" + } + }) + }.build() + + webTestClient + .post() + .uri("/api/v1/execution-service/upload") + .body(BodyInserters.fromMultipartData(body)) + .exchange() + .expectStatus().isOk + .returnResult<String>() + .responseBody + .awaitSingle() + } + } + + private fun loadCbaArchive(): File { + return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile() + } +} + + diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index d9e352bff..a480b115b 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.security.SecurityProperties import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.FilterType import org.springframework.core.io.ByteArrayResource import org.springframework.http.client.MultipartBodyBuilder import org.springframework.test.context.ContextConfiguration @@ -49,9 +48,10 @@ import kotlin.test.assertTrue @RunWith(SpringRunner::class) @WebFluxTest -@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class]) -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"], - excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE))) +@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, + BluePrintCatalogService::class, SecurityProperties::class]) +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", + "org.onap.ccsdk.cds.controllerblueprints"]) @TestPropertySource(locations = ["classpath:application-test.properties"]) class ExecutionServiceHandlerTest { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt deleted file mode 100644 index facbec585..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright © 2019 Bell Canada - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib - -import com.fasterxml.jackson.databind.node.ObjectNode -import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.runBlocking -import org.apache.commons.lang.builder.ToStringBuilder -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.StringDeserializer -import org.junit.After -import org.junit.Before -import org.junit.Ignore -import org.junit.Test -import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController -import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.boot.autoconfigure.security.SecurityProperties -import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration -import org.springframework.core.io.ByteArrayResource -import org.springframework.http.client.MultipartBodyBuilder -import org.springframework.kafka.annotation.EnableKafka -import org.springframework.kafka.annotation.KafkaListener -import org.springframework.kafka.annotation.PartitionOffset -import org.springframework.kafka.annotation.TopicPartition -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.ConsumerFactory -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.test.context.EmbeddedKafka -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.SpringRunner -import org.springframework.test.web.reactive.server.WebTestClient -import org.springframework.test.web.reactive.server.returnResult -import org.springframework.web.reactive.function.BodyInserters -import java.io.File -import java.nio.file.Files -import java.nio.file.Paths -import kotlin.test.assertNotNull -//FIXME("testReceive method is failing in server build, It is not stable, may be timing issue.") -@Ignore -@RunWith(SpringRunner::class) -@EnableAutoConfiguration -@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class]) -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"]) -@TestPropertySource(locations = ["classpath:application-test.properties"]) -@DirtiesContext -@EmbeddedKafka(ports = [9092]) -@WebFluxTest -class MessagingControllerTest { - - private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!! - - @Autowired - lateinit var controller: MessagingController - - @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}") - lateinit var topicUsedForConsumer: String - - @Autowired - lateinit var kt: KafkaTemplate<String, ExecutionServiceInput> - - @Autowired - lateinit var webTestClient: WebTestClient - - var event: ExecutionServiceInput? = null - - @Before - fun setup() { - deleteDir("target", "blueprints") - uploadBluePrint() - } - - @After - fun clean() { - deleteDir("target", "blueprints") - } - - @Test - fun testReceive() { - val samplePayload = "{\n" + - " \"resource-assignment-request\": {\n" + - " \"artifact-name\": [\"hostname\"],\n" + - " \"store-result\": true,\n" + - " \"resource-assignment-properties\" : {\n" + - " \"hostname\": \"demo123\"\n" + - " }\n" + - " }\n" + - " }" - - kt.defaultTopic = topicUsedForConsumer - - val input = ExecutionServiceInput().apply { - commonHeader = CommonHeader().apply { - originatorId = "1" - requestId = "1234" - subRequestId = "1234-1234" - } - - actionIdentifiers = ActionIdentifiers().apply { - blueprintName = "golden" - blueprintVersion = "1.0.0" - actionName = "resource-assignment" - mode = "sync" - } - - stepData = StepData().apply { - name = "resource-assignment" - } - - payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode - } - - kt.sendDefault(input) - log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input)) - - Thread.sleep(1000) - - assertNotNull(event) - } - - @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])]) - fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) { - event = receivedEvent - } - - private fun uploadBluePrint() { - runBlocking { - val body = MultipartBodyBuilder().apply { - part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) { - override fun getFilename(): String { - return "test-cba.zip" - } - }) - }.build() - - webTestClient - .post() - .uri("/api/v1/execution-service/upload") - .body(BodyInserters.fromMultipartData(body)) - .exchange() - .expectStatus().isOk - .returnResult<String>() - .responseBody - .awaitSingle() - } - } - - private fun loadCbaArchive():File { - return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile() - } - - @Configuration - @EnableKafka - open class ConsumerConfiguration { - - @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") - lateinit var bootstrapServers: String - - @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") - lateinit var groupId:String - - @Bean - open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? { - val configProperties = hashMapOf<String, Any>() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" - configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000 - - return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), - JsonDeserializer(ExecutionServiceInput::class.java)) - } - - @Bean - open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { - val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() - factory.consumerFactory = consumerFactory2() - return factory - } - } -} - - diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt deleted file mode 100644 index dc1f38a63..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright © 2019 Bell Canada - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.StringSerializer -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.kafka.annotation.EnableKafka -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.serializer.JsonSerializer - -@Configuration -open class ProducerConfiguration { - - @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") - lateinit var bootstrapServers: String - - open fun kpf(): ProducerFactory<String, ExecutionServiceInput> { - val configs = HashMap<String, Any>() - configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java - return DefaultKafkaProducerFactory(configs) - } - - @Bean - open fun kt(): KafkaTemplate<String, ExecutionServiceInput> { - return KafkaTemplate<String, ExecutionServiceInput>(kpf()) - } -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties index ab3bac88f..d18b70010 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties @@ -33,10 +33,15 @@ blueprints.processor.functions.python.executor.executionPath=./../../../../compo blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints # Kafka-message-lib Configuration -blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true -blueprintsprocessor.messageclient.self-service-api.topic=producer.t -blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth -blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092 -blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t -blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id -blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id +blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false +blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t +blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id +blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10 + +blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id +blueprintsprocessor.messageproducer.self-service-api.topic=producer.t diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml index 0c8d93bf0..dd81657a4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml @@ -1,35 +1,35 @@ -<!--
- ~ Copyright © 2017-2018 AT&T Intellectual Property.
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <!-- encoders are assigned the type
- ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
- </encoder>
- </appender>
-
-
- <logger name="org.springframework" level="warn"/>
- <logger name="org.hibernate" level="info"/>
- <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
-
- <root level="warn">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>
+<!-- + ~ Copyright © 2017-2018 AT&T Intellectual Property. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> + </encoder> + </appender> + + + <logger name="org.springframework" level="warn"/> + <logger name="org.hibernate" level="info"/> + <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> + + <root level="warn"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt index 08bc6c3fd..b74b7e4cf 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt @@ -54,6 +54,10 @@ fun String.asJsonPrimitive(): TextNode { return TextNode(this) } +inline fun <reified T : Any> String.jsonAsType(): T { + return JacksonUtils.readValue<T>(this.trim()) +} + // If you know the string is json content, then use the function directly fun String.jsonAsJsonType(): JsonNode { return JacksonUtils.jsonNode(this.trim()) |