summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2020-04-10 18:41:27 +0000
committerGerrit Code Review <gerrit@onap.org>2020-04-10 18:41:27 +0000
commite3bc9a69ee93bbe5d2675ce9468adfb76b25d3d9 (patch)
treeac1769518bdec5ff09a6704f9008c4327717bef8 /ms/blueprintsprocessor
parenta97c7dad9ff1e1cc8aa1c79a013e1139e121a2d7 (diff)
parente9dd04c53d8aff175909f04f33acae506800e25f (diff)
Merge "Publish execution input/output into Kafka topics"
Diffstat (limited to 'ms/blueprintsprocessor')
-rwxr-xr-xms/blueprintsprocessor/application/src/main/resources/application-dev.properties17
-rw-r--r--ms/blueprintsprocessor/application/src/test/resources/application.properties28
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt6
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt27
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt42
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt184
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt47
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt25
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt47
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties23
10 files changed, 386 insertions, 60 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
index 5beebd8e8..ad38883f7 100755
--- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
+++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
@@ -129,13 +129,20 @@ blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
# Message prioritization kakfa properties, Enable if Prioritization service is needed
# Deploy message-prioritization function along with blueprintsprocessor application.
diff --git a/ms/blueprintsprocessor/application/src/test/resources/application.properties b/ms/blueprintsprocessor/application/src/test/resources/application.properties
index cb3419397..c6e957b32 100644
--- a/ms/blueprintsprocessor/application/src/test/resources/application.properties
+++ b/ms/blueprintsprocessor/application/src/test/resources/application.properties
@@ -74,14 +74,26 @@ blueprintsprocessor.cliExecutor.enabled=true
blueprintsprocessor.remoteScriptCommand.enabled=false
-# Kafka-message-lib Configuration
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
+# Kafka-message-lib Configurations
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
endpoints.user.name=eHbVUbJAj4AG2522cSbrOQ==
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt
index d94985400..58d0f13c2 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiModelProperty
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import java.util.Date
+import java.util.UUID
/**
* BlueprintProcessorData
@@ -33,7 +34,8 @@ import java.util.Date
*/
open class ExecutionServiceInput {
-
+ @get:ApiModelProperty(required = false, hidden = true)
+ var correlationUUID: String = UUID.randomUUID().toString()
@get:ApiModelProperty(required = true, value = "Headers providing request context.")
lateinit var commonHeader: CommonHeader
@get:ApiModelProperty(required = true, value = "Provide information about the action to execute.")
@@ -51,6 +53,8 @@ open class ExecutionServiceInput {
}
open class ExecutionServiceOutput {
+ @get:ApiModelProperty(required = false, hidden = true)
+ var correlationUUID: String? = null
@get:ApiModelProperty(required = true, value = "Headers providing request context.")
lateinit var commonHeader: CommonHeader
@get:ApiModelProperty(required = true, value = "Provide information about the action to execute.")
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index d0b4df888..49f2a50d5 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -51,15 +51,13 @@ open class BluePrintProcessingKafkaConsumer(
companion object {
const val CONSUMER_SELECTOR = "self-service-api"
- const val PRODUCER_SELECTOR = "self-service-api"
}
@EventListener(ApplicationReadyEvent::class)
fun setupMessageListener() = runBlocking {
try {
log.info(
- "Setting up message consumer($CONSUMER_SELECTOR) and " +
- "message producer($PRODUCER_SELECTOR)..."
+ "Setting up message consumer($CONSUMER_SELECTOR)"
)
/** Get the Message Consumer Service **/
@@ -74,18 +72,6 @@ open class BluePrintProcessingKafkaConsumer(
throw BluePrintProcessorException("failed to create consumer service ${e.message}")
}
- /** Get the Message Producer Service **/
- val blueprintMessageProducerService = try {
- bluePrintMessageLibPropertyService
- .blueprintMessageProducerService(PRODUCER_SELECTOR)
- } catch (e: BluePrintProcessorException) {
- val errorMsg = "Failed creating Kafka producer message service."
- throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
- "Wrong Kafka selector provided or internal error in Kafka service.")
- } catch (e: Exception) {
- throw BluePrintProcessorException("failed to create producer service ${e.message}")
- }
-
launch {
/** Subscribe to the consumer topics */
val additionalConfig: MutableMap<String, Any> = hashMapOf()
@@ -96,10 +82,7 @@ open class BluePrintProcessingKafkaConsumer(
ph.register()
log.trace("Consumed Message : $message")
val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
- val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- // TODO("In future, Message publisher configuration vary with respect to request")
- /** Send the response message */
- blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ executionServiceHandler.doProcess(executionServiceInput)
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
} finally {
@@ -110,8 +93,7 @@ open class BluePrintProcessingKafkaConsumer(
}
} catch (e: Exception) {
log.error(
- "failed to start message consumer($CONSUMER_SELECTOR) and " +
- "message producer($PRODUCER_SELECTOR) ", e
+ "failed to start message consumer($CONSUMER_SELECTOR) ", e
)
}
}
@@ -120,8 +102,7 @@ open class BluePrintProcessingKafkaConsumer(
fun shutdownMessageListener() = runBlocking {
try {
log.info(
- "Shutting down message consumer($CONSUMER_SELECTOR) and " +
- "message producer($PRODUCER_SELECTOR)..."
+ "Shutting down message consumer($CONSUMER_SELECTOR)"
)
blueprintMessageConsumerService.shutDown()
ph.arriveAndAwaitAdvance()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
index 9524e375e..74c4b00e4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
@@ -44,7 +44,8 @@ class ExecutionServiceHandler(
private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
private val blueprintsProcessorCatalogService: BluePrintCatalogService,
private val bluePrintWorkflowExecutionService:
- BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
+ BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+ private val publishAuditService: PublishAuditService
) {
private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
@@ -67,33 +68,44 @@ class ExecutionServiceHandler(
responseObserver.onNext(executionServiceOutput.toProto())
responseObserver.onCompleted()
}
- else -> responseObserver.onNext(
- response(
- executionServiceInput,
- "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
- true
- ).toProto()
- )
+ else -> {
+ publishAuditService.publish(executionServiceInput)
+ val executionServiceOutput = response(
+ executionServiceInput,
+ "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+ true
+ )
+ publishAuditService.publish(executionServiceOutput)
+ responseObserver.onNext(
+ executionServiceOutput.toProto()
+ )
+ }
}
}
suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
val requestId = executionServiceInput.commonHeader.requestId
- log.info("processing request id $requestId")
val actionIdentifiers = executionServiceInput.actionIdentifiers
val blueprintName = actionIdentifiers.blueprintName
val blueprintVersion = actionIdentifiers.blueprintVersion
+
+ lateinit var executionServiceOutput: ExecutionServiceOutput
+
+ log.info("processing request id $requestId")
+
try {
+ publishAuditService.publish(executionServiceInput)
+
/** Check Blueprint is needed for this request */
if (checkServiceFunction(executionServiceInput)) {
- return executeServiceFunction(executionServiceInput)
+ executionServiceOutput = executeServiceFunction(executionServiceInput)
} else {
val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
log.info("blueprint base path $basePath")
val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
- val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+ executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
blueprintRuntimeService,
executionServiceInput, hashMapOf()
)
@@ -101,14 +113,16 @@ class ExecutionServiceHandler(
val errors = blueprintRuntimeService.getBluePrintError().errors
if (errors.isNotEmpty()) {
val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
- setErrorStatus(errorMessage, output.status)
+ setErrorStatus(errorMessage, executionServiceOutput.status)
}
- return output
}
} catch (e: Exception) {
log.error("fail processing request id $requestId", e)
- return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+ executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
+
+ publishAuditService.publish(executionServiceOutput)
+ return executionServiceOutput
}
/** If the blueprint name is default, It means no blueprint is needed for the execution */
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
new file mode 100644
index 000000000..129e7a54d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -0,0 +1,184 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils
+import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+import javax.annotation.PostConstruct
+
+/**
+ * Audit service used to produce execution service input and output message
+ * sent into dedicated kafka topics.
+ */
+@ConditionalOnProperty(
+ name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
+ havingValue = "true"
+)
+@Service
+class KafkaPublishAuditService(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val blueprintsProcessorCatalogService: BluePrintCatalogService
+) : PublishAuditService {
+
+ private var inputInstance: BlueprintMessageProducerService? = null
+ private var outputInstance: BlueprintMessageProducerService? = null
+
+ private lateinit var correlationUUID: String
+
+ private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
+
+ companion object {
+ const val INPUT_SELECTOR = "self-service-api.audit.request"
+ const val OUTPUT_SELECTOR = "self-service-api.audit.response"
+ }
+
+ @PostConstruct
+ private fun init() {
+ log.info("Kakfa audit service is enabled")
+ }
+
+ /**
+ * Publish execution input into a kafka topic.
+ * The correlation UUID is used to link the input to its output.
+ * Sensitive data within the request are hidden.
+ */
+ override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ this.correlationUUID = executionServiceInput.correlationUUID
+ val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+ this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+ this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ }
+
+ /**
+ * Publish execution output into a kafka topic.
+ * The correlation UUID is used to link the output to its input.
+ * A correlation UUID is added to link the input to its output.
+ */
+ override fun publish(executionServiceOutput: ExecutionServiceOutput) {
+ executionServiceOutput.correlationUUID = this.correlationUUID
+ this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+ this.outputInstance!!.sendMessage(executionServiceOutput)
+ }
+
+ /**
+ * Return the input kafka producer instance using a selector.
+ */
+ private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector)
+
+ /**
+ * Return the output kafka producer instance using a selector.
+ */
+ private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector)
+
+ /**
+ * Create a kafka producer instance.
+ */
+ private fun createInstance(selector: String): BlueprintMessageProducerService {
+ log.info(
+ "Setting up message producer($selector)..."
+ )
+ return try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService(selector)
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create producer service ${e.message}")
+ }
+ }
+
+ /**
+ * Hide sensitive data in the request.
+ * Sensitive data are declared in the resource resolution mapping using
+ * the property metadata "log-protect" set to true.
+ */
+ private suspend fun hideSensitiveData(
+ executionServiceInput: ExecutionServiceInput
+ ): ExecutionServiceInput {
+
+ var clonedExecutionServiceInput = ExecutionServiceInput().apply {
+ correlationUUID = executionServiceInput.correlationUUID
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ payload = executionServiceInput.payload
+ }
+
+ val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName
+ val workflowName = clonedExecutionServiceInput.actionIdentifiers.actionName
+
+ if (blueprintName == "default") return clonedExecutionServiceInput
+
+ if (clonedExecutionServiceInput.payload
+ .path("$workflowName-request").has("$workflowName-properties")) {
+
+ /** Retrieving sensitive input parameters */
+ val requestId = clonedExecutionServiceInput.commonHeader.requestId
+ val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+ val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+ val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+ val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+ val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName)
+ val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+ val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+ val propertyAssignments: MutableMap<String, JsonNode> =
+ blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+ ?: hashMapOf()
+
+ val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+ val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java)
+
+ /** Storing mapping entries with metadata log-protect set to true */
+ val sensitiveParameters: List<String> = artifactPrefixNames
+ .map { "$it-mapping" }
+ .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+ .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+ .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+ .map { it.name }
+
+ /** Hiding sensitive input parameters from the request */
+ var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+ .path("$workflowName-request")
+ .path("$workflowName-properties") as ObjectNode
+
+ sensitiveParameters.forEach { sensitiveParameter ->
+ if (workflowProperties.has(sensitiveParameter)) {
+ workflowProperties.remove(sensitiveParameter)
+ workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED)
+ }
+ }
+ }
+
+ return clonedExecutionServiceInput
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
new file mode 100644
index 000000000..3f782000b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+import javax.annotation.PostConstruct
+
+/**
+ * Default audit service when no audit publisher is defined, message aren't sent
+ */
+@ConditionalOnProperty(
+ name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
+ havingValue = "false"
+)
+@Service
+class NoPublishAuditService : PublishAuditService {
+
+ val log = logger(NoPublishAuditService::class)
+
+ @PostConstruct
+ fun init() {
+ log.info("Audit service is disabled")
+ }
+
+ override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+ }
+
+ override fun publish(executionServiceOutput: ExecutionServiceOutput) {
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
new file mode 100644
index 000000000..535a5eae0
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+
+interface PublishAuditService {
+ suspend fun publish(executionServiceInput: ExecutionServiceInput)
+ fun publish(executionServiceOutput: ExecutionServiceOutput)
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
index b21f968c9..37f7861be 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
@@ -16,6 +16,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import io.mockk.verify
+import io.mockk.coVerify
+import io.mockk.Runs
+import io.mockk.coEvery
+import io.mockk.just
import io.mockk.mockk
import kotlinx.coroutines.runBlocking
import org.junit.Before
@@ -23,6 +28,7 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -65,13 +71,52 @@ class ExecutionServiceHandlerTest {
}
}
runBlocking {
- val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk())
+ val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk(), mockk())
val isServiceFunction = executionServiceHandler.checkServiceFunction(executionServiceInput)
assertTrue(isServiceFunction, "failed to checkServiceFunction")
val executionServiceOutput = executionServiceHandler.executeServiceFunction(executionServiceInput)
assertNotNull(executionServiceOutput, "failed to get executionServiceOutput")
}
}
+
+ @Test
+ fun testPublishAuditFunction() {
+ val executionServiceInput = ExecutionServiceInput().apply {
+ commonHeader = CommonHeader().apply {
+ requestId = "1234"
+ subRequestId = "1234-12"
+ originatorId = "cds-test"
+ }
+ actionIdentifiers = ActionIdentifiers().apply {
+ blueprintName = "default"
+ blueprintVersion = "1.0.0"
+ actionName = "mock-service-action"
+ }
+ }
+
+ val publishAuditService = mockk<KafkaPublishAuditService>(relaxed = true)
+ val executionServiceHandler = ExecutionServiceHandler(
+ mockk(),
+ mockk(),
+ mockk(),
+ publishAuditService
+ )
+
+ coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs
+
+ var executionServiceOutput: ExecutionServiceOutput? = null
+ runBlocking {
+ executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+ }
+
+ coVerify {
+ publishAuditService.publish(executionServiceInput)
+ }
+
+ verify {
+ publishAuditService.publish(executionServiceOutput!!)
+ }
+ }
}
@Service("mock-service-action")
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
index 6003df1df..fb2189ffb 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
@@ -37,16 +37,23 @@ error.catalog.errorDefinitionDir=./../../../application/src/test/resources/
blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints
blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
-# Kafka-message-lib Configuration
+# Kafka-message-lib Configurations
blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t