diff options
13 files changed, 266 insertions, 112 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties index bf5e23bc9..fb32d9afc 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties @@ -69,9 +69,9 @@ blueprintsprocessor.grpcclient.py-executor.trustCertCollection=src/main/resource # Blueprint Processor File Execution and Handling Properties ### use absolute paths if testing inside docker -### blueprintsprocessor.blueprintDeployPath=/opt/app/onap/blueprints/deploy -### blueprintsprocessor.blueprintArchivePath=/opt/app/onap/blueprints/archive -### blueprintsprocessor.blueprintWorkingPath=/opt/app/onap/blueprints/working +#blueprintsprocessor.blueprintDeployPath=/opt/app/onap/blueprints/deploy +#blueprintsprocessor.blueprintArchivePath=/opt/app/onap/blueprints/archive +#blueprintsprocessor.blueprintWorkingPath=/opt/app/onap/blueprints/working # db @@ -120,10 +120,12 @@ blueprintsprocessor.netconfExecutor.enabled=true blueprintsprocessor.restConfExecutor.enabled=true blueprintsprocessor.cliExecutor.enabled=true ### If enabling remote python executor, set this value to true -### blueprintsprocessor.remoteScriptCommand.enabled=true +#blueprintsprocessor.remoteScriptCommand.enabled=true blueprintsprocessor.remoteScriptCommand.enabled=false +blueprintsprocessor.remote-script-command.response.log.enabled=false # Kafka-message-lib Configurations +## Request consumer blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092 @@ -141,6 +143,12 @@ blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000 #blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user #blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword +## Response producer +blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.clientId=producer-id +blueprintsprocessor.messageproducer.self-service-api.topic=producer.t + # Kafka audit service Configurations ## Audit request blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index d66e8b374..d4c8841a8 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -21,6 +21,7 @@ import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.async import kotlinx.coroutines.withTimeout +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteIdentifier @@ -47,11 +48,15 @@ import org.springframework.stereotype.Component @ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION]) @Component("component-remote-python-executor") @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) -open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService) : AbstractComponentFunction() { +open class ComponentRemotePythonExecutor( + private val remoteScriptExecutionService: RemoteScriptExecutionService, + private var bluePrintPropertiesService: BluePrintPropertiesService +) : AbstractComponentFunction() { private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!! companion object { + const val SELECTOR_CMD_EXEC = "blueprintsprocessor.remote-script-command" const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector" const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties" const val INPUT_ARGUMENT_PROPERTIES = "argument-properties" @@ -62,6 +67,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout" const val INPUT_EXECUTE_TIMEOUT = "execution-timeout" + const val STEP_PREPARE_ENV = "prepare-env" + const val STEP_EXEC_CMD = "execute-command" const val ATTRIBUTE_EXEC_CMD_STATUS = "status" const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs" const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs" @@ -74,6 +81,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic log.debug("Processing : $operationInputs") + val isLogResponseEnabled = bluePrintPropertiesService.propertyBeanType("$SELECTOR_CMD_EXEC.response.log.enabled", Boolean::class.java) + val bluePrintContext = bluePrintRuntimeService.bluePrintContext() val blueprintName = bluePrintContext.name() val blueprintVersion = bluePrintContext.version() @@ -142,15 +151,25 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic ) val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput) log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}") - val logs = prepareEnvOutput.response + val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response) val logsEnv = logs.toString().asJsonPrimitive() setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv) if (prepareEnvOutput.status != StatusType.SUCCESS) { - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive()) - setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv) + val errorMessage = prepareEnvOutput.payload + setNodeOutputErrors(prepareEnvOutput.status.name, + STEP_PREPARE_ENV, + logs, + errorMessage, + isLogResponseEnabled + ) } else { - setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive()) + setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), + STEP_PREPARE_ENV, + logsEnv, + "".asJsonPrimitive(), + isLogResponseEnabled + ) } } else { // set env preparation log to empty... @@ -159,13 +178,13 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic } catch (grpcEx: io.grpc.StatusRuntimeException) { val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId)." setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive()) - setNodeOutputErrors(status = grpcErrMsg, message = "${grpcEx.status}".asJsonPrimitive()) + setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled) log.error(grpcErrMsg, grpcEx) addError(grpcErrMsg) } catch (e: Exception) { val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)." setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive()) - setNodeOutputErrors(status = timeoutErrMsg, message = "${e.message}".asJsonPrimitive()) + setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled) log.error("Failed to process on remote executor requestId ($processId)", e) addError(timeoutErrMsg) } @@ -195,18 +214,37 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic } val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response) if (remoteExecutionOutput.status != StatusType.SUCCESS) { - setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload) + setNodeOutputErrors(remoteExecutionOutput.status.name, + STEP_EXEC_CMD, + logs, + remoteExecutionOutput.payload, + isLogResponseEnabled + ) } else { - setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs, - remoteExecutionOutput.payload) + setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), + STEP_EXEC_CMD, + logs, + remoteExecutionOutput.payload, + isLogResponseEnabled + ) } } catch (timeoutEx: TimeoutCancellationException) { val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)" - setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive()) + setNodeOutputErrors(status = timeoutErrMsg, + step = STEP_EXEC_CMD, + logs = "".asJsonPrimitive(), + error = "".asJsonPrimitive(), + logging = isLogResponseEnabled + ) log.error(timeoutErrMsg, timeoutEx) } catch (grpcEx: io.grpc.StatusRuntimeException) { val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)" - setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive()) + setNodeOutputErrors(status = timeoutErrMsg, + step = STEP_EXEC_CMD, + logs = "".asJsonPrimitive(), + error = "".asJsonPrimitive(), + logging = isLogResponseEnabled + ) log.error("Command executor time out during GRPC call", grpcEx) } catch (e: Exception) { log.error("Failed to process on remote executor requestId ($processId)", e) @@ -234,25 +272,38 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic /** * Utility function to set the output properties of the executor node */ - private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) { + private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) { setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status) - log.info("Executor status : $status") setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) - log.info("Executor artifacts: $artifacts") setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) - log.info("Executor message : $message") + + if (logging) { + log.info("Executor status : $step : $status") + log.info("Executor artifacts: $step : $artifacts") + log.info("Executor message : $step : $message") + } } /** * Utility function to set the output properties and errors of the executor node, in cas of errors */ - private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) { + private fun setNodeOutputErrors( + status: String, + step: String, + logs: JsonNode = "N/A".asJsonPrimitive(), + error: JsonNode, + logging: Boolean = true + ) { setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive()) - log.info("Executor status : $status") - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) - log.info("Executor message : $message") - setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) - log.info("Executor artifacts: $artifacts") - addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString()) + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs) + setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive()) + + if (logging) { + log.info("Executor status : $step : $status") + log.info("Executor message : $step : $error") + log.info("Executor logs : $step : $logs") + } + + addError(status, step, error.toString()) } } diff --git a/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt b/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt index 5e57b9eb7..d4edf4bb2 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt @@ -22,6 +22,7 @@ import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.runBlocking import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteScriptExecutionInput @@ -47,7 +48,10 @@ class ComponentRemotePythonExecutorTest { runBlocking { val remoteScriptExecutionService = MockRemoteScriptExecutionService() - val componentRemotePythonExecutor = ComponentRemotePythonExecutor(remoteScriptExecutionService) + val componentRemotePythonExecutor = ComponentRemotePythonExecutor( + remoteScriptExecutionService, + mockk<BluePrintPropertiesService>() + ) val executionServiceInput = JacksonUtils.readValueFromClassPathFile( @@ -88,7 +92,10 @@ class ComponentRemotePythonExecutorTest { fun testComponentRemotePythonExecutorProcessNB() { runBlocking { val remoteScriptExecutionService = MockRemoteScriptExecutionService() - val componentRemotePythonExecutor = ComponentRemotePythonExecutor(remoteScriptExecutionService) + val componentRemotePythonExecutor = ComponentRemotePythonExecutor( + remoteScriptExecutionService, + mockk<BluePrintPropertiesService>() + ) val bluePrintRuntime = mockk<DefaultBluePrintRuntimeService>("123456-1000") every { bluePrintRuntime.getBluePrintError() } answers { BluePrintError() } // successful case. diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index ac35fbf2c..b07d64388 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -78,7 +78,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() @@ -142,7 +142,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() @@ -218,7 +218,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 931f052ed..e4991d2d8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -17,12 +17,16 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory @@ -39,6 +43,10 @@ class KafkaMessageProducerService( private val messageLoggerService = MessageLoggerService() + companion object { + const val MAX_ERR_MSG_LEN = 128 + } + override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } return sendMessageNB(messageProducerProperties.topic!!, message) @@ -54,9 +62,14 @@ class KafkaMessageProducerService( message: Any, headers: MutableMap<String, String>? ): Boolean { - val byteArrayMessage = when (message) { - is String -> message.toByteArray(Charset.defaultCharset()) - else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + var clonedMessage = message + if (clonedMessage is ExecutionServiceOutput) { + clonedMessage = truncateResponse(clonedMessage) + } + + val byteArrayMessage = when (clonedMessage) { + is String -> clonedMessage.toByteArray(Charset.defaultCharset()) + else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset()) } val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) @@ -85,4 +98,37 @@ class KafkaMessageProducerService( return kafkaProducer!! } + + /** + * Truncation of BP responses + */ + private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput { + /** Truncation of error messages */ + var truncErrMsg = executionServiceOutput.status.errorMessage + if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) { + truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" + + " [...]. Check Blueprint Processor logs for more information." + } + /** Truncation for Command Executor responses */ + var truncPayload = executionServiceOutput.payload.deepCopy() + val workflowName = executionServiceOutput.actionIdentifiers.actionName + if (truncPayload.path("$workflowName-response").has("execute-command-logs")) { + var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode + cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive()) + } + return ExecutionServiceOutput().apply { + correlationUUID = executionServiceOutput.correlationUUID + commonHeader = executionServiceOutput.commonHeader + actionIdentifiers = executionServiceOutput.actionIdentifiers + status = Status().apply { + code = executionServiceOutput.status.code + eventType = executionServiceOutput.status.eventType + timestamp = executionServiceOutput.status.timestamp + errorMessage = truncErrMsg + message = executionServiceOutput.status.message + } + payload = truncPayload + stepData = executionServiceOutput.stepData + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index ac08dc7b7..fdf6e48e7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -214,7 +214,7 @@ open class BlueprintMessageConsumerServiceTest { SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + "username=\"sample-user\" " + diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 72a47ed56..da7394998 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -109,7 +109,7 @@ open class BlueprintMessageProducerServiceTest { SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + "username=\"sample-user\" " + diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 49f2a50d5..a95af8123 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -51,13 +51,15 @@ open class BluePrintProcessingKafkaConsumer( companion object { const val CONSUMER_SELECTOR = "self-service-api" + const val PRODUCER_SELECTOR = "self-service-api" } @EventListener(ApplicationReadyEvent::class) fun setupMessageListener() = runBlocking { try { log.info( - "Setting up message consumer($CONSUMER_SELECTOR)" + "Setting up message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." ) /** Get the Message Consumer Service **/ @@ -72,6 +74,18 @@ open class BluePrintProcessingKafkaConsumer( throw BluePrintProcessorException("failed to create consumer service ${e.message}") } + /** Get the Message Producer Service **/ + val blueprintMessageProducerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(PRODUCER_SELECTOR) + } catch (e: BluePrintProcessorException) { + val errorMsg = "Failed creating Kafka producer message service." + throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, + "Wrong Kafka selector provided or internal error in Kafka service.") + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + launch { /** Subscribe to the consumer topics */ val additionalConfig: MutableMap<String, Any> = hashMapOf() @@ -82,7 +96,8 @@ open class BluePrintProcessingKafkaConsumer( ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() - executionServiceHandler.doProcess(executionServiceInput) + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + blueprintMessageProducerService.sendMessage(executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { @@ -93,7 +108,8 @@ open class BluePrintProcessingKafkaConsumer( } } catch (e: Exception) { log.error( - "failed to start message consumer($CONSUMER_SELECTOR) ", e + "failed to start message consumer($CONSUMER_SELECTOR) " + + "message producer($PRODUCER_SELECTOR) ", e ) } } @@ -102,7 +118,8 @@ open class BluePrintProcessingKafkaConsumer( fun shutdownMessageListener() = runBlocking { try { log.info( - "Shutting down message consumer($CONSUMER_SELECTOR)" + "Shutting down message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." ) blueprintMessageConsumerService.shutDown() ph.arriveAndAwaitAdvance() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 129e7a54d..1c5d47c27 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -24,6 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils @@ -48,12 +49,9 @@ class KafkaPublishAuditService( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, private val blueprintsProcessorCatalogService: BluePrintCatalogService ) : PublishAuditService { - private var inputInstance: BlueprintMessageProducerService? = null private var outputInstance: BlueprintMessageProducerService? = null - private lateinit var correlationUUID: String - private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) companion object { @@ -127,7 +125,8 @@ class KafkaPublishAuditService( correlationUUID = executionServiceInput.correlationUUID commonHeader = executionServiceInput.commonHeader actionIdentifiers = executionServiceInput.actionIdentifiers - payload = executionServiceInput.payload + payload = executionServiceInput.payload.deepCopy() + stepData = executionServiceInput.stepData } val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName @@ -173,8 +172,7 @@ class KafkaPublishAuditService( sensitiveParameters.forEach { sensitiveParameter -> if (workflowProperties.has(sensitiveParameter)) { - workflowProperties.remove(sensitiveParameter) - workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED) + workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) } } } diff --git a/ms/command-executor/src/main/docker/Dockerfile b/ms/command-executor/src/main/docker/Dockerfile index c38126066..7a20469b7 100644 --- a/ms/command-executor/src/main/docker/Dockerfile +++ b/ms/command-executor/src/main/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.6-slim ENV GRPC_PYTHON_VERSION 1.20.0 RUN python -m pip install --upgrade pip RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION} -RUN pip install virtualenv==16.7.9 +RUN pip install virtualenv==16.7.9 pympler==0.8 RUN groupadd -r onap && useradd -r -g onap onap diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 1e6f03b81..0c476b23e 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -43,43 +43,48 @@ class CommandExecutorHandler(): def is_installed(self): return os.path.exists(self.installed) - def prepare_env(self, request, results): + def prepare_env(self, request): + results_log = [] if not self.is_installed(): create_venv_status = self.create_venv() - if not create_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status["err_msg"]) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) activate_venv_status = self.activate_venv() - if not activate_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status["err_msg"]) + if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) try: with open(self.installed, "w+") as f: - if not self.install_packages(request, CommandExecutor_pb2.pip, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id)) + if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) f.write("\r\n") # TODO: is \r needed? - results.append("\n") - if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id)) + results_log.append("\n") + if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) except Exception as ex: err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_id, ex) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) else: try: with open(self.installed, "r") as f: - results.append(f.read()) + results_log.append(f.read()) except Exception as ex: - return utils.build_ret_data(False, "ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)) + err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex) + return utils.build_ret_data(False, error=err_msg) # deactivate_venv(blueprint_id) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True, results_log=results_log) - def execute_command(self, request, results): - payload_result = {} + def execute_command(self, request): + results_log = [] + result = {} # workaround for when packages are not specified, we may not want to go through the install step # can just call create_venv from here. if not self.is_installed(): @@ -87,16 +92,14 @@ class CommandExecutorHandler(): try: if not self.is_installed(): create_venv_status = self.create_venv - if not create_venv_status["cds_is_successful"]: - err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status["err_msg"]) - results.append(err_msg) - return utils.build_ret_data(False, err_msg) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) + return utils.build_ret_data(False, error=err_msg) activate_response = self.activate_venv() - if not activate_response["cds_is_successful"]: - orig_error = activate_response["err_msg"] + if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]: + orig_error = activate_response[utils.ERR_MSG_KEY] err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_id, orig_error) - results.append(err_msg) #TODO: get rid of results and just rely on the return data struct. - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) cmd = "cd " + self.venv_home @@ -131,26 +134,25 @@ class CommandExecutorHandler(): payload = '\n'.join(payload_section) msg = email.parser.Parser().parsestr(payload) for part in msg.get_payload(): - payload_result = json.loads(part.get_payload()) + result = json.loads(part.get_payload()) if output and not is_payload_section: self.logger.info(output.strip()) - results.append(output.strip()) + results_log.append(output.strip()) else: payload_section.append(output.strip()) rc = newProcess.poll() except Exception as e: err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e) - self.logger.info(err_msg) - results.append(e) - payload_result.update(utils.build_ret_data(False, err_msg)) - return payload_result + return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) # deactivate_venv(blueprint_id) #Since return code is only used to check if it's zero (success), we can just return success flag instead. self.logger.debug("python return_code : {}".format(rc)) - is_execution_successful = rc == 0 - payload_result.update(utils.build_ret_data(is_execution_successful, "")) - return payload_result + if rc == 0: + return utils.build_ret_data(True, results=result, results_log=results_log) + else: + err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id) + return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) def install_packages(self, request, type, f, results): success = self.install_python_packages('UTILITY', results) diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py index 3435e2272..207097605 100644 --- a/ms/command-executor/src/main/python/command_executor_server.py +++ b/ms/command-executor/src/main/python/command_executor_server.py @@ -22,9 +22,6 @@ import proto.CommandExecutor_pb2_grpc as CommandExecutor_pb2_grpc from command_executor_handler import CommandExecutorHandler import utils -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServicer): def __init__(self): @@ -35,14 +32,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi self.logger.info("{} - Received prepareEnv request".format(blueprint_id)) self.logger.info(request) - results = [] handler = CommandExecutorHandler(request) - prepare_env_response = handler.prepare_env(request, results) - if not prepare_env_response["cds_is_successful"]: - self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, False) - self.logger.info("{} - Package installation logs {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, True) + prepare_env_response = handler.prepare_env(request) + if prepare_env_response[utils.CDS_IS_SUCCESSFUL_KEY]: + self.logger.info("{} - Package installation logs {}".format(blueprint_id, prepare_env_response[utils.RESULTS_LOG_KEY])) + else: + self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, prepare_env_response[utils.ERR_MSG_KEY])) + self.logger.info("Prepare Env Response returned : %s" % prepare_env_response) + return utils.build_grpc_response(request.requestId, prepare_env_response) def executeCommand(self, request, context): blueprint_id = utils.get_blueprint_id(request) @@ -53,13 +50,15 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi log_results = [] payload_result = {} handler = CommandExecutorHandler(request) - payload_result = handler.execute_command(request, log_results) - if not payload_result["cds_is_successful"]: - self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, log_results)) - else: + exec_cmd_response = handler.execute_command(request) + if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]: self.logger.info("{} - Execution finished successfully.".format(blueprint_id)) + self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY])) + self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY])) + else: + self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY])) - ret = utils.build_grpc_response(request, log_results, payload_result, payload_result["cds_is_successful"]) - self.logger.info("Payload returned %s" % payload_result) + ret = utils.build_grpc_response(request.requestId, exec_cmd_response) + self.logger.info("Response returned : {}".format(exec_cmd_response)) return ret
\ No newline at end of file diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index 574be51db..b98241629 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,7 +17,13 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json +from pympler import asizeof +CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" +ERR_MSG_KEY = "err_msg" +RESULTS_KEY = "results" +RESULTS_LOG_KEY = "results_log" +TRUNC_MSG_LEN = 3 * 1024 * 1024 def get_blueprint_id(request): blueprint_name = request.identifiers.blueprintName @@ -25,27 +31,47 @@ def get_blueprint_id(request): return blueprint_name + '/' + blueprint_version # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element -def build_grpc_response(request, log_results, payload_return, is_success=False): - if is_success: +def build_grpc_response(request_id, response): + if response[CDS_IS_SUCCESSFUL_KEY]: status = CommandExecutor_pb2.SUCCESS + payload = json.dumps(response[RESULTS_KEY]) else: status = CommandExecutor_pb2.FAILURE + # truncate error message if too heavy + if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN: + response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN]) + payload = json.dumps(response[ERR_MSG_KEY]) + + # truncate cmd-exec logs if too heavy + response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY]) timestamp = Timestamp() timestamp.GetCurrentTime() - if "cds_is_successful" in payload_return: - payload_return.pop('cds_is_successful') - payload_str = json.dumps(payload_return) - return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId, - response=log_results, + return CommandExecutor_pb2.ExecutionOutput(requestId=request_id, + response=response[RESULTS_LOG_KEY], status=status, - payload=payload_str, + payload=payload, timestamp=timestamp) -# build a return data structure which may contain an error message -def build_ret_data(cds_is_successful, err_msg): - ret_data = {"cds_is_successful": cds_is_successful } - if err_msg != "": - ret_data["err_msg"] = err_msg +# build a ret data structure +def build_ret_data(cds_is_successful, results={}, results_log=[], error=None): + ret_data = { + CDS_IS_SUCCESSFUL_KEY: cds_is_successful, + RESULTS_KEY: results, + RESULTS_LOG_KEY: results_log + } + if error: + ret_data[ERR_MSG_KEY] = error return ret_data + +def truncate_cmd_exec_logs(logs): + truncated_logs = [] + truncated_logs_memsize = 0 + for log in logs: + truncated_logs_memsize += asizeof.asizeof(log) + if truncated_logs_memsize > TRUNC_MSG_LEN: + truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.") + break + truncated_logs.append(log) + return truncated_logs
\ No newline at end of file |