aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds/selfservice-api
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-06-18 18:16:38 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-06-22 14:46:22 -0400
commitc2046c09fd65a9ba4d000beacb3836869db771f2 (patch)
tree847f9bb3824b0a906589567e819673b4c7360207 /ms/blueprintsprocessor/modules/inbounds/selfservice-api
parent214c8dfe0e56aff0fe0c2a3a5d7f76f2a4972710 (diff)
Kafka Audit Service : Improve error handling and miscellaneous refactoring
When Kafka Audit Service fails it no longer stops Blueprint Processor execution * Add error handling when trying to hide sensitive data * Add error handling when trying to send kafka message * Set timeout for blocking loop when sending messages with kafka producer -> When broker is not available producer tries to reconnect in a blocking loop * Refactor Audit Service interface to give more explict name for publish methods * Modify publishExecutionOutput() to a non-blocking function Issue-ID: CCSDK-2459 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: I809a5f34f81889aa9eed499608348f149984bc38
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt155
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt10
5 files changed, 96 insertions, 87 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
index e9d0b7b51..6c62aae88 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
@@ -69,13 +69,13 @@ class ExecutionServiceHandler(
responseObserver.onCompleted()
}
else -> {
- publishAuditService.publish(executionServiceInput)
+ publishAuditService.publishExecutionInput(executionServiceInput)
val executionServiceOutput = response(
executionServiceInput,
"Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
true
)
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
responseObserver.onNext(
executionServiceOutput.toProto()
)
@@ -93,9 +93,9 @@ class ExecutionServiceHandler(
log.info("processing request id $requestId")
- try {
- publishAuditService.publish(executionServiceInput)
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ try {
/** Check Blueprint is needed for this request */
if (checkServiceFunction(executionServiceInput)) {
executionServiceOutput = executeServiceFunction(executionServiceInput)
@@ -121,7 +121,7 @@ class ExecutionServiceHandler(
executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
return executionServiceOutput
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 6ff217942..fca73981e 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
@@ -70,10 +69,17 @@ class KafkaPublishAuditService(
* The correlation UUID is used to link the input to its output.
* Sensitive data within the request are hidden.
*/
- override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
- this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
- this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ try {
+ this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+ this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ } catch (e: Exception) {
+ var errMsg =
+ if (e.message != null) "ERROR : ${e.message}"
+ else "ERROR : Failed to send execution request to Kafka."
+ log.error(errMsg)
+ }
}
/**
@@ -81,10 +87,17 @@ class KafkaPublishAuditService(
* The correlation UUID is used to link the output to its input.
* A correlation UUID is added to link the input to its output.
*/
- override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
executionServiceOutput.correlationUUID = correlationUUID
- this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
- this.outputInstance!!.sendMessage(executionServiceOutput)
+ try {
+ this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+ this.outputInstance!!.sendMessage(executionServiceOutput)
+ } catch (e: Exception) {
+ var errMsg =
+ if (e.message != null) "ERROR : $e"
+ else "ERROR : Failed to send execution request to Kafka."
+ log.error(errMsg)
+ }
}
/**
@@ -101,15 +114,8 @@ class KafkaPublishAuditService(
* Create a kafka producer instance.
*/
private fun createInstance(selector: String): BlueprintMessageProducerService {
- log.info(
- "Setting up message producer($selector)..."
- )
- return try {
- bluePrintMessageLibPropertyService
- .blueprintMessageProducerService(selector)
- } catch (e: Exception) {
- throw BluePrintProcessorException("failed to create producer service ${e.message}")
- }
+ log.info("Setting up message producer($selector)...")
+ return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector)
}
/**
@@ -134,66 +140,73 @@ class KafkaPublishAuditService(
if (blueprintName == "default") return clonedExecutionServiceInput
- if (clonedExecutionServiceInput.payload
- .path("$workflowName-request").has("$workflowName-properties")) {
-
- /** Retrieving sensitive input parameters */
- val requestId = clonedExecutionServiceInput.commonHeader.requestId
- val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
-
- val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-
- val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
- val blueprintContext = blueprintRuntimeService.bluePrintContext()
-
- /** Looking for node templates defined as component-resource-resolution */
- val nodeTemplates = blueprintContext.nodeTemplates()
- nodeTemplates!!.forEach { nodeTemplate ->
- val nodeTemplateName = nodeTemplate.key
- val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
- if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
- val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
- val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
- val propertyAssignments: MutableMap<String, JsonNode> =
- blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
- ?: hashMapOf()
-
- /** Getting values define in artifact-prefix-names */
- val input = executionServiceInput.payload.get("$workflowName-request")
- blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
- val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
- val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
- val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
- BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
- nodeTemplateName,
- ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
- artifactPrefixNamesNode!!)
-
- val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
-
- /** Storing mapping entries with metadata log-protect set to true */
- val sensitiveParameters: List<String> = artifactPrefixNames
- .map { "$it-mapping" }
- .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
- .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
- .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
- .map { it.name }
-
- /** Hiding sensitive input parameters from the request */
- var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
- .path("$workflowName-request")
- .path("$workflowName-properties") as ObjectNode
-
- sensitiveParameters.forEach { sensitiveParameter ->
- if (workflowProperties.has(sensitiveParameter)) {
- workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+ try {
+ if (clonedExecutionServiceInput.payload
+ .path("$workflowName-request").has("$workflowName-properties")) {
+
+ /** Retrieving sensitive input parameters */
+ val requestId = clonedExecutionServiceInput.commonHeader.requestId
+ val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+ val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+ val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+ val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+ /** Looking for node templates defined as component-resource-resolution */
+ val nodeTemplates = blueprintContext.nodeTemplates()
+ nodeTemplates!!.forEach { nodeTemplate ->
+ val nodeTemplateName = nodeTemplate.key
+ val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+ if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+ val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+ val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+ val propertyAssignments: MutableMap<String, JsonNode> =
+ blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+ ?: hashMapOf()
+
+ /** Getting values define in artifact-prefix-names */
+ val input = executionServiceInput.payload.get("$workflowName-request")
+ blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+ val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+ val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+ val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+ BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+ nodeTemplateName,
+ ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+ artifactPrefixNamesNode!!)
+
+ val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+ /** Storing mapping entries with metadata log-protect set to true */
+ val sensitiveParameters: List<String> = artifactPrefixNames
+ .map { "$it-mapping" }
+ .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+ .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+ .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+ .map { it.name }
+
+ /** Hiding sensitive input parameters from the request */
+ var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+ .path("$workflowName-request")
+ .path("$workflowName-properties") as ObjectNode
+
+ sensitiveParameters.forEach { sensitiveParameter ->
+ if (workflowProperties.has(sensitiveParameter)) {
+ workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+ }
}
}
}
}
+ } catch (e: Exception) {
+ val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, e)
+ clonedExecutionServiceInput.payload.replace(
+ "$workflowName-request",
+ "$errMsg $e".asJsonPrimitive())
}
-
return clonedExecutionServiceInput
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
index eb66e411a..6ad73d88a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
@@ -39,9 +39,9 @@ class NoPublishAuditService : PublishAuditService {
log.info("Audit service is disabled")
}
- override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
}
- override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
index 72f493187..67473c807 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
@@ -20,6 +20,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
interface PublishAuditService {
- suspend fun publish(executionServiceInput: ExecutionServiceInput)
- fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
+ suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput)
+ suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
index 191456296..70e1ed0fd 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
@@ -16,7 +16,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-import io.mockk.verify
import io.mockk.coVerify
import io.mockk.Runs
import io.mockk.coEvery
@@ -102,7 +101,7 @@ class ExecutionServiceHandlerTest {
publishAuditService
)
- coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs
+ coEvery { publishAuditService.publishExecutionInput(ExecutionServiceInput()) } just Runs
var executionServiceOutput: ExecutionServiceOutput? = null
runBlocking {
@@ -110,11 +109,8 @@ class ExecutionServiceHandlerTest {
}
coVerify {
- publishAuditService.publish(executionServiceInput)
- }
-
- verify {
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!)
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput!!)
}
}
}