summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt155
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt10
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt30
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt23
11 files changed, 150 insertions, 110 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index b07d64388..d76621c26 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties()
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
var clientId: String? = null
- // strongest producing guarantee
- var acks: String = "all"
- var retries: Int = 0
- // ensure we don't push duplicates
- var enableIdempotence: Boolean = true
+ var acks: String = "all" // strongest producing guarantee
+ var maxBlockMs: Int = 250 // max blocking time in ms to send a message
+ var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
+ var enableIdempotence: Boolean = true // ensure we don't push duplicates
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
configProps[ProducerConfig.ACKS_CONFIG] = acks
+ configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
+ configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
if (clientId != null) {
configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index c659fdb8b..e9bc5d8ad 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks)
- fun retries(retries: Int) = retries(retries.asJsonPrimitive())
+ fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive())
- fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries)
+ fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs)
+
+ fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive())
+
+ fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs)
fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive())
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index 612a57d23..b1af230b9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -35,7 +35,8 @@ class MessagePropertiesDSLTest {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
acks("all")
- retries(3)
+ maxBlockMs(0)
+ reconnectBackOffMs(60 * 60 * 1000)
enableIdempotence(true)
topic("sample-topic")
truststore("/path/to/truststore.jks")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index da7394998..537dab1ba 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -100,6 +100,8 @@ open class BlueprintMessageProducerServiceTest {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+ ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
index e9d0b7b51..6c62aae88 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
@@ -69,13 +69,13 @@ class ExecutionServiceHandler(
responseObserver.onCompleted()
}
else -> {
- publishAuditService.publish(executionServiceInput)
+ publishAuditService.publishExecutionInput(executionServiceInput)
val executionServiceOutput = response(
executionServiceInput,
"Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
true
)
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
responseObserver.onNext(
executionServiceOutput.toProto()
)
@@ -93,9 +93,9 @@ class ExecutionServiceHandler(
log.info("processing request id $requestId")
- try {
- publishAuditService.publish(executionServiceInput)
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ try {
/** Check Blueprint is needed for this request */
if (checkServiceFunction(executionServiceInput)) {
executionServiceOutput = executeServiceFunction(executionServiceInput)
@@ -121,7 +121,7 @@ class ExecutionServiceHandler(
executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
return executionServiceOutput
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 6ff217942..fca73981e 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
@@ -70,10 +69,17 @@ class KafkaPublishAuditService(
* The correlation UUID is used to link the input to its output.
* Sensitive data within the request are hidden.
*/
- override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
- this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
- this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ try {
+ this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+ this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ } catch (e: Exception) {
+ var errMsg =
+ if (e.message != null) "ERROR : ${e.message}"
+ else "ERROR : Failed to send execution request to Kafka."
+ log.error(errMsg)
+ }
}
/**
@@ -81,10 +87,17 @@ class KafkaPublishAuditService(
* The correlation UUID is used to link the output to its input.
* A correlation UUID is added to link the input to its output.
*/
- override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
executionServiceOutput.correlationUUID = correlationUUID
- this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
- this.outputInstance!!.sendMessage(executionServiceOutput)
+ try {
+ this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+ this.outputInstance!!.sendMessage(executionServiceOutput)
+ } catch (e: Exception) {
+ var errMsg =
+ if (e.message != null) "ERROR : $e"
+ else "ERROR : Failed to send execution request to Kafka."
+ log.error(errMsg)
+ }
}
/**
@@ -101,15 +114,8 @@ class KafkaPublishAuditService(
* Create a kafka producer instance.
*/
private fun createInstance(selector: String): BlueprintMessageProducerService {
- log.info(
- "Setting up message producer($selector)..."
- )
- return try {
- bluePrintMessageLibPropertyService
- .blueprintMessageProducerService(selector)
- } catch (e: Exception) {
- throw BluePrintProcessorException("failed to create producer service ${e.message}")
- }
+ log.info("Setting up message producer($selector)...")
+ return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector)
}
/**
@@ -134,66 +140,73 @@ class KafkaPublishAuditService(
if (blueprintName == "default") return clonedExecutionServiceInput
- if (clonedExecutionServiceInput.payload
- .path("$workflowName-request").has("$workflowName-properties")) {
-
- /** Retrieving sensitive input parameters */
- val requestId = clonedExecutionServiceInput.commonHeader.requestId
- val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
-
- val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-
- val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
- val blueprintContext = blueprintRuntimeService.bluePrintContext()
-
- /** Looking for node templates defined as component-resource-resolution */
- val nodeTemplates = blueprintContext.nodeTemplates()
- nodeTemplates!!.forEach { nodeTemplate ->
- val nodeTemplateName = nodeTemplate.key
- val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
- if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
- val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
- val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
- val propertyAssignments: MutableMap<String, JsonNode> =
- blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
- ?: hashMapOf()
-
- /** Getting values define in artifact-prefix-names */
- val input = executionServiceInput.payload.get("$workflowName-request")
- blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
- val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
- val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
- val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
- BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
- nodeTemplateName,
- ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
- artifactPrefixNamesNode!!)
-
- val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
-
- /** Storing mapping entries with metadata log-protect set to true */
- val sensitiveParameters: List<String> = artifactPrefixNames
- .map { "$it-mapping" }
- .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
- .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
- .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
- .map { it.name }
-
- /** Hiding sensitive input parameters from the request */
- var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
- .path("$workflowName-request")
- .path("$workflowName-properties") as ObjectNode
-
- sensitiveParameters.forEach { sensitiveParameter ->
- if (workflowProperties.has(sensitiveParameter)) {
- workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+ try {
+ if (clonedExecutionServiceInput.payload
+ .path("$workflowName-request").has("$workflowName-properties")) {
+
+ /** Retrieving sensitive input parameters */
+ val requestId = clonedExecutionServiceInput.commonHeader.requestId
+ val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+ val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+ val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+ val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+ /** Looking for node templates defined as component-resource-resolution */
+ val nodeTemplates = blueprintContext.nodeTemplates()
+ nodeTemplates!!.forEach { nodeTemplate ->
+ val nodeTemplateName = nodeTemplate.key
+ val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+ if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+ val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+ val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+ val propertyAssignments: MutableMap<String, JsonNode> =
+ blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+ ?: hashMapOf()
+
+ /** Getting values define in artifact-prefix-names */
+ val input = executionServiceInput.payload.get("$workflowName-request")
+ blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+ val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+ val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+ val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+ BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+ nodeTemplateName,
+ ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+ artifactPrefixNamesNode!!)
+
+ val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+ /** Storing mapping entries with metadata log-protect set to true */
+ val sensitiveParameters: List<String> = artifactPrefixNames
+ .map { "$it-mapping" }
+ .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+ .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+ .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+ .map { it.name }
+
+ /** Hiding sensitive input parameters from the request */
+ var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+ .path("$workflowName-request")
+ .path("$workflowName-properties") as ObjectNode
+
+ sensitiveParameters.forEach { sensitiveParameter ->
+ if (workflowProperties.has(sensitiveParameter)) {
+ workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+ }
}
}
}
}
+ } catch (e: Exception) {
+ val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, e)
+ clonedExecutionServiceInput.payload.replace(
+ "$workflowName-request",
+ "$errMsg $e".asJsonPrimitive())
}
-
return clonedExecutionServiceInput
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
index eb66e411a..6ad73d88a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
@@ -39,9 +39,9 @@ class NoPublishAuditService : PublishAuditService {
log.info("Audit service is disabled")
}
- override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
}
- override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
index 72f493187..67473c807 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
@@ -20,6 +20,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
interface PublishAuditService {
- suspend fun publish(executionServiceInput: ExecutionServiceInput)
- fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
+ suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput)
+ suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
index 191456296..70e1ed0fd 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
@@ -16,7 +16,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-import io.mockk.verify
import io.mockk.coVerify
import io.mockk.Runs
import io.mockk.coEvery
@@ -102,7 +101,7 @@ class ExecutionServiceHandlerTest {
publishAuditService
)
- coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs
+ coEvery { publishAuditService.publishExecutionInput(ExecutionServiceInput()) } just Runs
var executionServiceOutput: ExecutionServiceOutput? = null
runBlocking {
@@ -110,11 +109,8 @@ class ExecutionServiceHandlerTest {
}
coVerify {
- publishAuditService.publish(executionServiceInput)
- }
-
- verify {
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!)
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput!!)
}
}
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
index 211bf76fb..4cd809778 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
@@ -105,7 +105,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
/** Resolve and validate lock properties */
implementation.lock?.apply {
val resolvedValues = bluePrintRuntimeService.resolvePropertyAssignments(
- nodeTemplateName,
+ BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
interfaceName,
mutableMapOf("key" to this.key, "acquireTimeout" to this.acquireTimeout))
this.key = resolvedValues["key"] ?: "".asJsonType()
@@ -153,21 +153,14 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
}
override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
- prepareRequestNB(executionServiceInput)
- return implementation.lock?.let {
- bluePrintClusterService.clusterLock("${it.key.textValue()}@$CDS_LOCK_GROUP")
- .executeWithLock(it.acquireTimeout.intValue().times(1000).toLong()) {
- applyNBWithTimeout(executionServiceInput)
- }
- } ?: applyNBWithTimeout(executionServiceInput)
- }
-
- private suspend fun applyNBWithTimeout(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
try {
- withTimeout((implementation.timeout * 1000).toLong()) {
- log.debug("DEBUG::: AbstractComponentFunction.withTimeout section ${implementation.timeout} seconds")
- processNB(executionServiceInput)
- }
+ prepareRequestNB(executionServiceInput)
+ implementation.lock?.let {
+ bluePrintClusterService.clusterLock("${it.key.textValue()}@$CDS_LOCK_GROUP")
+ .executeWithLock(it.acquireTimeout.intValue().times(1000).toLong()) {
+ applyNBWithTimeout(executionServiceInput)
+ }
+ } ?: applyNBWithTimeout(executionServiceInput)
} catch (runtimeException: RuntimeException) {
log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException)
recoverNB(runtimeException, executionServiceInput)
@@ -175,6 +168,13 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
return prepareResponseNB()
}
+ private suspend fun applyNBWithTimeout(executionServiceInput: ExecutionServiceInput) =
+ withTimeout((implementation.timeout * 1000).toLong()) {
+ log.debug("DEBUG::: AbstractComponentFunction.withTimeout " +
+ "section ${implementation.timeout} seconds")
+ processNB(executionServiceInput)
+ }
+
fun getOperationInput(key: String): JsonNode {
return operationInputs[key]
?: throw BluePrintProcessorException("couldn't get the operation input($key) value.")
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
index e0b690573..0f9dfd157 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.mockk.every
import io.mockk.mockk
+import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.runBlocking
import org.junit.Test
@@ -53,6 +54,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.junit4.SpringRunner
+import java.lang.RuntimeException
import kotlin.test.BeforeTest
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
@@ -204,6 +206,27 @@ class AbstractComponentFunctionTest {
}
@Test
+ fun `applyNB should catch exceptions and call recoverNB`() {
+ val exception = RuntimeException("Intentional test exception")
+ every {
+ bluePrintRuntimeService.resolvePropertyAssignments(any(), any(), any())
+ } throws exception
+ every {
+ blueprintContext.nodeTemplateOperationImplementation(any(), any(), any())
+ } returns Implementation().apply {
+ this.lock = LockAssignment().apply { this.key = "testing-lock".asJsonType() }
+ }
+
+ val component: AbstractComponentFunction = spyk(SampleComponent())
+ component.bluePrintRuntimeService = bluePrintRuntimeService
+ component.bluePrintClusterService = blueprintClusterService
+ val input = getMockedInput(bluePrintRuntimeService)
+
+ runBlocking { component.applyNB(input) }
+ verify { runBlocking { component.recoverNB(exception, input) } }
+ }
+
+ @Test
fun `applyNB - when lock is present use ClusterLock`() {
val lockName = "testing-lock"