diff options
Diffstat (limited to 'ms')
10 files changed, 120 insertions, 104 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index 848647405..50f0b1499 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -123,8 +123,8 @@ open class ComponentRemotePythonExecutor( val executionTimeout = getOptionalOperationInput(INPUT_EXECUTE_TIMEOUT)?.asInt() ?: DEFAULT_EXECUTE_TIMEOUT_IN_SEC - // set the component level timeout as envPrepTimeout + executionTimeout (small delta will be applied in AbstractComponentFunction - val timeout = envPrepTimeout + executionTimeout + // component level timeout should be => env_prep_timeout + execution_timeout + val timeout = implementation.timeout var scriptCommand = command.replace(pythonScript.name, pythonScript.absolutePath) if (args != null && args.isNotEmpty()) { @@ -161,17 +161,17 @@ open class ComponentRemotePythonExecutor( if (prepareEnvOutput.status != StatusType.SUCCESS) { val errorMessage = prepareEnvOutput.payload setNodeOutputErrors(prepareEnvOutput.status.name, - STEP_PREPARE_ENV, - logs, - errorMessage, - isLogResponseEnabled + STEP_PREPARE_ENV, + logs, + errorMessage, + isLogResponseEnabled ) } else { setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), - STEP_PREPARE_ENV, - logsEnv, - "".asJsonPrimitive(), - isLogResponseEnabled + STEP_PREPARE_ENV, + logsEnv, + "".asJsonPrimitive(), + isLogResponseEnabled ) } } else { @@ -180,17 +180,15 @@ open class ComponentRemotePythonExecutor( } } catch (grpcEx: io.grpc.StatusRuntimeException) { val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else "" - val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg" + val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}" setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive()) - setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled) + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) log.error(grpcErrMsg, grpcEx) - addError(grpcErrMsg) } catch (e: Exception) { - val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)." + val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}" setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive()) - setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled) - log.error("Failed to process on remote executor requestId ($processId)", e) - addError(timeoutErrMsg) + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + log.error(timeoutErrMsg, e) } // if Env preparation was successful, then proceed with command execution in this Env if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) { @@ -214,45 +212,47 @@ open class ComponentRemotePythonExecutor( } checkNotNull(remoteExecutionOutput) { - "Error: Request-id $processId did not return a restul from remote command execution." + "Error: Request-id $processId did not return a result from remote command execution." } val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response) if (remoteExecutionOutput.status != StatusType.SUCCESS) { setNodeOutputErrors(remoteExecutionOutput.status.name, - STEP_EXEC_CMD, - logs, - remoteExecutionOutput.payload, - isLogResponseEnabled + STEP_EXEC_CMD, + logs, + remoteExecutionOutput.payload, + isLogResponseEnabled ) } else { setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), - STEP_EXEC_CMD, - logs, - remoteExecutionOutput.payload, - isLogResponseEnabled + STEP_EXEC_CMD, + logs, + remoteExecutionOutput.payload, + isLogResponseEnabled ) } } catch (timeoutEx: TimeoutCancellationException) { val componentLevelWarningMsg = if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else "" val timeoutErrMsg = "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg" - setNodeOutputErrors(status = timeoutErrMsg, - step = STEP_EXEC_CMD, - logs = "".asJsonPrimitive(), - error = "".asJsonPrimitive(), - logging = isLogResponseEnabled + setNodeOutputErrors(status = StatusType.FAILURE.name, + step = STEP_EXEC_CMD, + logs = "".asJsonPrimitive(), + error = timeoutErrMsg.asJsonPrimitive(), + logging = isLogResponseEnabled ) log.error(timeoutErrMsg, timeoutEx) } catch (grpcEx: io.grpc.StatusRuntimeException) { - val timeoutErrMsg = "Command executor failed to execute requestId ($processId) error (${grpcEx.status.cause?.message})" - setNodeOutputErrors(status = timeoutErrMsg, - step = STEP_EXEC_CMD, - logs = "".asJsonPrimitive(), - error = "".asJsonPrimitive(), - logging = isLogResponseEnabled + val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}" + setNodeOutputErrors(status = StatusType.FAILURE.name, + step = STEP_EXEC_CMD, + logs = "".asJsonPrimitive(), + error = timeoutErrMsg.asJsonPrimitive(), + logging = isLogResponseEnabled ) - log.error("Command executor time out during GRPC call", grpcEx) + log.error(timeoutErrMsg, grpcEx) } catch (e: Exception) { - log.error("Failed to process on remote executor requestId ($processId)", e) + val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}" + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + log.error(timeoutErrMsg, e) } } log.debug("Trying to close GRPC channel. request ($processId)") @@ -277,7 +277,14 @@ open class ComponentRemotePythonExecutor( /** * Utility function to set the output properties of the executor node */ - private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) { + private fun setNodeOutputProperties( + status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(), + step: String, + message: JsonNode, + artifacts: JsonNode, + logging: Boolean = true + ) { + setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status) setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 81fc0d709..0a58857f7 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -29,10 +29,10 @@ import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** - * Exposed Dependency Service by this Hazlecast Lib Module + * Exposed Dependency Service by this Hazelcast Lib Module */ fun BluePrintDependencyService.clusterService(): BluePrintClusterService = - instance(HazlecastClusterService::class) + instance(HazelcastClusterService::class) /** Optional Cluster Service, returns only if Cluster is enabled */ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt index feb2a8e2a..d3c88d732 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -45,9 +45,9 @@ import java.time.Duration import java.util.concurrent.TimeUnit @Service -open class HazlecastClusterService : BluePrintClusterService { +open class HazelcastClusterService : BluePrintClusterService { - private val log = logger(HazlecastClusterService::class) + private val log = logger(HazelcastClusterService::class) lateinit var hazelcast: HazelcastInstance lateinit var cpSubsystemManagementService: CPSubsystemManagementService var joinedClient = false @@ -179,14 +179,14 @@ open class HazlecastClusterService : BluePrintClusterService { override suspend fun shutDown(duration: Duration) { if (::hazelcast.isInitialized && clusterJoined()) { delay(duration.toMillis()) - HazlecastClusterUtils.terminate(hazelcast) + HazelcastClusterUtils.terminate(hazelcast) } } /** Utils */ suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { if (!joinedClient && !joinedLite) { - HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance) + HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance) } } @@ -243,17 +243,18 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val } override suspend fun unLock() { - // Added condition to avoid failures like - "Current thread is not owner of the lock!" - if (distributedLock.isLockedByCurrentThread) { - distributedLock.unlock() - log.trace("Cluster unlock(${name()}) successfully..") - } + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") } override fun isLocked(): Boolean { return distributedLock.isLocked } + override fun isLockedByCurrentThread(): Boolean { + return distributedLock.isLockedByCurrentThread + } + override suspend fun fenceLock(): String { val fence = distributedLock.lockAndGetFence() log.trace("Cluster lock($name) fence($fence) created..") diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt index 70970f6da..e5f488a0e 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt @@ -25,9 +25,9 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.util.UUID import java.util.concurrent.TimeUnit -object HazlecastClusterUtils { +object HazelcastClusterUtils { - private val log = logger(HazlecastClusterUtils::class) + private val log = logger(HazelcastClusterUtils::class) /** Promote [hazelcastInstance] member to CP Member */ fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 9725553a5..2d957c289 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -78,6 +78,7 @@ interface ClusterLock { suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean + fun isLockedByCurrentThread(): Boolean fun close() } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt index 80cf41558..e214b6593 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -20,13 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode import com.hazelcast.client.config.YamlClientConfigBuilder import com.hazelcast.cluster.Member import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext +import org.junit.After +import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo @@ -35,16 +39,21 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import java.io.Serializable -import java.time.Duration import java.util.Properties import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue -class HazlecastClusterServiceTest { - private val log = logger(HazlecastClusterServiceTest::class) +class HazelcastClusterServiceTest { + private val log = logger(HazelcastClusterServiceTest::class) private val clusterSize = 3 + @Before + @After + fun killAllHazelcastInstances() { + HazelcastInstanceFactory.terminateAll() + } + @Test fun testClientFileSystemYamlConfig() { System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") @@ -74,33 +83,23 @@ class HazlecastClusterServiceTest { fun testClusterJoin() { runBlocking { val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() - // delay(1000) - // Join as Hazlecast Management Node - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + createCluster(arrayListOf(1, 2, 3)).toMutableList() printReachableMembers(bluePrintClusterServiceOne) testDistributedStore(bluePrintClusterServiceOne) testDistributedLock(bluePrintClusterServiceOne) - - // executeScheduler(bluePrintClusterServiceOne[0]) - // delay(1000) - // Shutdown - shutdown(bluePrintClusterServiceOne) } } private suspend fun createCluster( - ports: List<Int>, + ids: List<Int>, joinAsClient: Boolean? = false ): List<BluePrintClusterService> { return withContext(Dispatchers.Default) { - val deferred = ports.map { port -> + val deferred = ids.map { id -> async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") + val nodeId = "node-$id" + log.info("********** Starting ($nodeId)") val properties = Properties() properties["hazelcast.logging.type"] = "slf4j" val clusterInfo = @@ -117,21 +116,15 @@ class HazlecastClusterServiceTest { properties = properties ) } - val hazlecastClusterService = HazlecastClusterService() - hazlecastClusterService.startCluster(clusterInfo) - hazlecastClusterService + val hazelcastClusterService = HazelcastClusterService() + hazelcastClusterService.startCluster(clusterInfo) + hazelcastClusterService } } deferred.awaitAll() } } - private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - bluePrintClusterService.shutDown(Duration.ofMillis(10)) - } - } - private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { /** Test Distributed store creation */ repeat(2) { storeId -> @@ -159,13 +152,25 @@ class HazlecastClusterServiceTest { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) + newSingleThreadContext("first").use { + withContext(it) { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + } } val deferred2 = async { - executeLock(bluePrintClusterServices[1], "second", lockName) + newSingleThreadContext("second").use { + withContext(it) { + executeLock(bluePrintClusterServices[1], "second", lockName) + } + } } val deferred3 = async { - executeLock(bluePrintClusterServices[2], "third", lockName) + newSingleThreadContext("third").use { + withContext(it) { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + } } deferred.start() deferred2.start() @@ -195,12 +200,12 @@ class HazlecastClusterServiceTest { private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { log.info("initialising ...") - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap assertEquals(3, memberNameMap.size, "failed to match member size") memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } - val scheduler = hazlecastClusterService.clusterScheduler("cleanup") + val scheduler = hazelcastClusterService.clusterScheduler("cleanup") // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) @@ -209,15 +214,15 @@ class HazlecastClusterServiceTest { private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) { bluePrintClusterServices.forEach { bluePrintClusterService -> - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - val hazelcast = hazlecastClusterService.hazelcast + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + val hazelcast = hazelcastClusterService.hazelcast val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null - val master = hazlecastClusterService.masterMember("system").memberAddress - val members = hazlecastClusterService.allMembers().map { it.memberAddress } + val master = hazelcastClusterService.masterMember("system").memberAddress + val members = hazelcastClusterService.allMembers().map { it.memberAddress } log.info("Cluster Members for($self): master($master) Members($members)") } - val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-") assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") log.info("Cluster applicationMembers ($applicationMembers)") } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml index de6047a90..b4dc3454a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml @@ -7,10 +7,15 @@ hazelcast: session-time-to-live-seconds: 60 session-heartbeat-interval-seconds: 5 missing-cp-member-auto-removal-seconds: 120 + metrics: + enabled: false network: join: - multicast: + tcp-ip: enabled: true + interface: 127.0.0.1 + multicast: + enabled: false # Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue # on macOs with docker installed and multicast address different than 224.0.0.1 # https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index 06100f1fc..2aa408527 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -31,17 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflow import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintWorkFlowService import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage -import org.springframework.beans.factory.config.ConfigurableBeanFactory -import org.springframework.context.annotation.Scope import org.springframework.stereotype.Service @Service("imperativeWorkflowExecutionService") class ImperativeWorkflowExecutionService( - private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput> + private val nodeTemplateExecutionService: NodeTemplateExecutionService ) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { @@ -57,15 +54,11 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() - return imperativeBluePrintWorkflowService.executeWorkflow( - graph, bluePrintRuntimeService, - executionServiceInput - ) + return ImperativeBluePrintWorkflowService(nodeTemplateExecutionService) + .executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) } } -@Service -@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService) : AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() { diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt index 1d4738c8d..c200f4ae2 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt @@ -119,8 +119,7 @@ class ImperativeWorkflowExecutionServiceTest { ExecutionServiceInput::class.java )!! - val bluePrintWorkFlowService = ImperativeBluePrintWorkflowService(NodeTemplateExecutionService(mockk())) - val imperativeWorkflowExecutionService = ImperativeWorkflowExecutionService(bluePrintWorkFlowService) + val imperativeWorkflowExecutionService = ImperativeWorkflowExecutionService(NodeTemplateExecutionService(mockk())) val output = imperativeWorkflowExecutionService .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) assertNotNull(output, "failed to get imperative workflow output") diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 0c476b23e..7c9ef84c1 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -151,7 +151,12 @@ class CommandExecutorHandler(): if rc == 0: return utils.build_ret_data(True, results=result, results_log=results_log) else: - err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id) + err_msg = "" + if len(results_log) > 0: + # get exception message + err_msg = "{} - {}".format(self.blueprint_id, results_log[-1:][0]) + else: + err_msg = "{} - Process exited with return code {}".format(self.blueprint_id, rc) return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) def install_packages(self, request, type, f, results): |