summaryrefslogtreecommitdiffstats
path: root/ms
diff options
context:
space:
mode:
Diffstat (limited to 'ms')
-rw-r--r--ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt87
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt (renamed from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt)19
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt (renamed from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt)4
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt (renamed from ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt)79
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml7
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt13
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt3
-rw-r--r--ms/command-executor/src/main/python/command_executor_handler.py7
10 files changed, 120 insertions, 104 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
index 848647405..50f0b1499 100644
--- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
+++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
@@ -123,8 +123,8 @@ open class ComponentRemotePythonExecutor(
val executionTimeout = getOptionalOperationInput(INPUT_EXECUTE_TIMEOUT)?.asInt()
?: DEFAULT_EXECUTE_TIMEOUT_IN_SEC
- // set the component level timeout as envPrepTimeout + executionTimeout (small delta will be applied in AbstractComponentFunction
- val timeout = envPrepTimeout + executionTimeout
+ // component level timeout should be => env_prep_timeout + execution_timeout
+ val timeout = implementation.timeout
var scriptCommand = command.replace(pythonScript.name, pythonScript.absolutePath)
if (args != null && args.isNotEmpty()) {
@@ -161,17 +161,17 @@ open class ComponentRemotePythonExecutor(
if (prepareEnvOutput.status != StatusType.SUCCESS) {
val errorMessage = prepareEnvOutput.payload
setNodeOutputErrors(prepareEnvOutput.status.name,
- STEP_PREPARE_ENV,
- logs,
- errorMessage,
- isLogResponseEnabled
+ STEP_PREPARE_ENV,
+ logs,
+ errorMessage,
+ isLogResponseEnabled
)
} else {
setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
- STEP_PREPARE_ENV,
- logsEnv,
- "".asJsonPrimitive(),
- isLogResponseEnabled
+ STEP_PREPARE_ENV,
+ logsEnv,
+ "".asJsonPrimitive(),
+ isLogResponseEnabled
)
}
} else {
@@ -180,17 +180,15 @@ open class ComponentRemotePythonExecutor(
}
} catch (grpcEx: io.grpc.StatusRuntimeException) {
val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
- val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg"
+ val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}"
setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
- setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled)
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
log.error(grpcErrMsg, grpcEx)
- addError(grpcErrMsg)
} catch (e: Exception) {
- val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
+ val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
- setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled)
- log.error("Failed to process on remote executor requestId ($processId)", e)
- addError(timeoutErrMsg)
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+ log.error(timeoutErrMsg, e)
}
// if Env preparation was successful, then proceed with command execution in this Env
if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
@@ -214,45 +212,47 @@ open class ComponentRemotePythonExecutor(
}
checkNotNull(remoteExecutionOutput) {
- "Error: Request-id $processId did not return a restul from remote command execution."
+ "Error: Request-id $processId did not return a result from remote command execution."
}
val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
if (remoteExecutionOutput.status != StatusType.SUCCESS) {
setNodeOutputErrors(remoteExecutionOutput.status.name,
- STEP_EXEC_CMD,
- logs,
- remoteExecutionOutput.payload,
- isLogResponseEnabled
+ STEP_EXEC_CMD,
+ logs,
+ remoteExecutionOutput.payload,
+ isLogResponseEnabled
)
} else {
setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
- STEP_EXEC_CMD,
- logs,
- remoteExecutionOutput.payload,
- isLogResponseEnabled
+ STEP_EXEC_CMD,
+ logs,
+ remoteExecutionOutput.payload,
+ isLogResponseEnabled
)
}
} catch (timeoutEx: TimeoutCancellationException) {
val componentLevelWarningMsg = if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else ""
val timeoutErrMsg = "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg"
- setNodeOutputErrors(status = timeoutErrMsg,
- step = STEP_EXEC_CMD,
- logs = "".asJsonPrimitive(),
- error = "".asJsonPrimitive(),
- logging = isLogResponseEnabled
+ setNodeOutputErrors(status = StatusType.FAILURE.name,
+ step = STEP_EXEC_CMD,
+ logs = "".asJsonPrimitive(),
+ error = timeoutErrMsg.asJsonPrimitive(),
+ logging = isLogResponseEnabled
)
log.error(timeoutErrMsg, timeoutEx)
} catch (grpcEx: io.grpc.StatusRuntimeException) {
- val timeoutErrMsg = "Command executor failed to execute requestId ($processId) error (${grpcEx.status.cause?.message})"
- setNodeOutputErrors(status = timeoutErrMsg,
- step = STEP_EXEC_CMD,
- logs = "".asJsonPrimitive(),
- error = "".asJsonPrimitive(),
- logging = isLogResponseEnabled
+ val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}"
+ setNodeOutputErrors(status = StatusType.FAILURE.name,
+ step = STEP_EXEC_CMD,
+ logs = "".asJsonPrimitive(),
+ error = timeoutErrMsg.asJsonPrimitive(),
+ logging = isLogResponseEnabled
)
- log.error("Command executor time out during GRPC call", grpcEx)
+ log.error(timeoutErrMsg, grpcEx)
} catch (e: Exception) {
- log.error("Failed to process on remote executor requestId ($processId)", e)
+ val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+ log.error(timeoutErrMsg, e)
}
}
log.debug("Trying to close GRPC channel. request ($processId)")
@@ -277,7 +277,14 @@ open class ComponentRemotePythonExecutor(
/**
* Utility function to set the output properties of the executor node
*/
- private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) {
+ private fun setNodeOutputProperties(
+ status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(),
+ step: String,
+ message: JsonNode,
+ artifacts: JsonNode,
+ logging: Boolean = true
+ ) {
+
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
index 81fc0d709..0a58857f7 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
@@ -29,10 +29,10 @@ import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
/**
- * Exposed Dependency Service by this Hazlecast Lib Module
+ * Exposed Dependency Service by this Hazelcast Lib Module
*/
fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
- instance(HazlecastClusterService::class)
+ instance(HazelcastClusterService::class)
/** Optional Cluster Service, returns only if Cluster is enabled */
fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
index feb2a8e2a..d3c88d732 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
@@ -45,9 +45,9 @@ import java.time.Duration
import java.util.concurrent.TimeUnit
@Service
-open class HazlecastClusterService : BluePrintClusterService {
+open class HazelcastClusterService : BluePrintClusterService {
- private val log = logger(HazlecastClusterService::class)
+ private val log = logger(HazelcastClusterService::class)
lateinit var hazelcast: HazelcastInstance
lateinit var cpSubsystemManagementService: CPSubsystemManagementService
var joinedClient = false
@@ -179,14 +179,14 @@ open class HazlecastClusterService : BluePrintClusterService {
override suspend fun shutDown(duration: Duration) {
if (::hazelcast.isInitialized && clusterJoined()) {
delay(duration.toMillis())
- HazlecastClusterUtils.terminate(hazelcast)
+ HazelcastClusterUtils.terminate(hazelcast)
}
}
/** Utils */
suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
if (!joinedClient && !joinedLite) {
- HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance)
+ HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
}
}
@@ -243,17 +243,18 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
}
override suspend fun unLock() {
- // Added condition to avoid failures like - "Current thread is not owner of the lock!"
- if (distributedLock.isLockedByCurrentThread) {
- distributedLock.unlock()
- log.trace("Cluster unlock(${name()}) successfully..")
- }
+ distributedLock.unlock()
+ log.trace("Cluster unlock(${name()}) successfully..")
}
override fun isLocked(): Boolean {
return distributedLock.isLocked
}
+ override fun isLockedByCurrentThread(): Boolean {
+ return distributedLock.isLockedByCurrentThread
+ }
+
override suspend fun fenceLock(): String {
val fence = distributedLock.lockAndGetFence()
log.trace("Cluster lock($name) fence($fence) created..")
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt
index 70970f6da..e5f488a0e 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt
@@ -25,9 +25,9 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.util.UUID
import java.util.concurrent.TimeUnit
-object HazlecastClusterUtils {
+object HazelcastClusterUtils {
- private val log = logger(HazlecastClusterUtils::class)
+ private val log = logger(HazelcastClusterUtils::class)
/** Promote [hazelcastInstance] member to CP Member */
fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index 9725553a5..2d957c289 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -78,6 +78,7 @@ interface ClusterLock {
suspend fun tryFenceLock(timeout: Long): String
suspend fun unLock()
fun isLocked(): Boolean
+ fun isLockedByCurrentThread(): Boolean
fun close()
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
index 80cf41558..e214b6593 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt
@@ -20,13 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode
import com.hazelcast.client.config.YamlClientConfigBuilder
import com.hazelcast.cluster.Member
import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.instance.impl.HazelcastInstanceFactory
import com.hazelcast.map.IMap
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
+import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
+import org.junit.After
+import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
@@ -35,16 +39,21 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import java.io.Serializable
-import java.time.Duration
import java.util.Properties
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
-class HazlecastClusterServiceTest {
- private val log = logger(HazlecastClusterServiceTest::class)
+class HazelcastClusterServiceTest {
+ private val log = logger(HazelcastClusterServiceTest::class)
private val clusterSize = 3
+ @Before
+ @After
+ fun killAllHazelcastInstances() {
+ HazelcastInstanceFactory.terminateAll()
+ }
+
@Test
fun testClientFileSystemYamlConfig() {
System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
@@ -74,33 +83,23 @@ class HazlecastClusterServiceTest {
fun testClusterJoin() {
runBlocking {
val bluePrintClusterServiceOne =
- createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
- // delay(1000)
- // Join as Hazlecast Management Node
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
- // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ createCluster(arrayListOf(1, 2, 3)).toMutableList()
printReachableMembers(bluePrintClusterServiceOne)
testDistributedStore(bluePrintClusterServiceOne)
testDistributedLock(bluePrintClusterServiceOne)
-
- // executeScheduler(bluePrintClusterServiceOne[0])
- // delay(1000)
- // Shutdown
- shutdown(bluePrintClusterServiceOne)
}
}
private suspend fun createCluster(
- ports: List<Int>,
+ ids: List<Int>,
joinAsClient: Boolean? = false
): List<BluePrintClusterService> {
return withContext(Dispatchers.Default) {
- val deferred = ports.map { port ->
+ val deferred = ids.map { id ->
async(Dispatchers.IO) {
- val nodeId = "node-$port"
- log.info("********** Starting node($nodeId) on port($port)")
+ val nodeId = "node-$id"
+ log.info("********** Starting ($nodeId)")
val properties = Properties()
properties["hazelcast.logging.type"] = "slf4j"
val clusterInfo =
@@ -117,21 +116,15 @@ class HazlecastClusterServiceTest {
properties = properties
)
}
- val hazlecastClusterService = HazlecastClusterService()
- hazlecastClusterService.startCluster(clusterInfo)
- hazlecastClusterService
+ val hazelcastClusterService = HazelcastClusterService()
+ hazelcastClusterService.startCluster(clusterInfo)
+ hazelcastClusterService
}
}
deferred.awaitAll()
}
}
- private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
- bluePrintClusterServices.forEach { bluePrintClusterService ->
- bluePrintClusterService.shutDown(Duration.ofMillis(10))
- }
- }
-
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
@@ -159,13 +152,25 @@ class HazlecastClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices[0], "first", lockName)
+ newSingleThreadContext("first").use {
+ withContext(it) {
+ executeLock(bluePrintClusterServices[0], "first", lockName)
+ }
+ }
}
val deferred2 = async {
- executeLock(bluePrintClusterServices[1], "second", lockName)
+ newSingleThreadContext("second").use {
+ withContext(it) {
+ executeLock(bluePrintClusterServices[1], "second", lockName)
+ }
+ }
}
val deferred3 = async {
- executeLock(bluePrintClusterServices[2], "third", lockName)
+ newSingleThreadContext("third").use {
+ withContext(it) {
+ executeLock(bluePrintClusterServices[2], "third", lockName)
+ }
+ }
}
deferred.start()
deferred2.start()
@@ -195,12 +200,12 @@ class HazlecastClusterServiceTest {
private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
log.info("initialising ...")
- val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
+ val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
assertEquals(3, memberNameMap.size, "failed to match member size")
memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
- val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
+ val scheduler = hazelcastClusterService.clusterScheduler("cleanup")
// scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
// scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
// scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
@@ -209,15 +214,15 @@ class HazlecastClusterServiceTest {
private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
bluePrintClusterServices.forEach { bluePrintClusterService ->
- val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
- val hazelcast = hazlecastClusterService.hazelcast
+ val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService
+ val hazelcast = hazelcastClusterService.hazelcast
val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
- val master = hazlecastClusterService.masterMember("system").memberAddress
- val members = hazlecastClusterService.allMembers().map { it.memberAddress }
+ val master = hazelcastClusterService.masterMember("system").memberAddress
+ val members = hazelcastClusterService.allMembers().map { it.memberAddress }
log.info("Cluster Members for($self): master($master) Members($members)")
}
- val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
+ val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-")
assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
log.info("Cluster applicationMembers ($applicationMembers)")
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml
index de6047a90..b4dc3454a 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml
@@ -7,10 +7,15 @@ hazelcast:
session-time-to-live-seconds: 60
session-heartbeat-interval-seconds: 5
missing-cp-member-auto-removal-seconds: 120
+ metrics:
+ enabled: false
network:
join:
- multicast:
+ tcp-ip:
enabled: true
+ interface: 127.0.0.1
+ multicast:
+ enabled: false
# Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue
# on macOs with docker installed and multicast address different than 224.0.0.1
# https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
index 06100f1fc..2aa408527 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
@@ -31,17 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflow
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintWorkFlowService
import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
-import org.springframework.beans.factory.config.ConfigurableBeanFactory
-import org.springframework.context.annotation.Scope
import org.springframework.stereotype.Service
@Service("imperativeWorkflowExecutionService")
class ImperativeWorkflowExecutionService(
- private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>
+ private val nodeTemplateExecutionService: NodeTemplateExecutionService
) :
BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
@@ -57,15 +54,11 @@ class ImperativeWorkflowExecutionService(
val graph = bluePrintContext.workflowByName(workflowName).asGraph()
- return imperativeBluePrintWorkflowService.executeWorkflow(
- graph, bluePrintRuntimeService,
- executionServiceInput
- )
+ return ImperativeBluePrintWorkflowService(nodeTemplateExecutionService)
+ .executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput)
}
}
-@Service
-@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService) :
AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
index 1d4738c8d..c200f4ae2 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
@@ -119,8 +119,7 @@ class ImperativeWorkflowExecutionServiceTest {
ExecutionServiceInput::class.java
)!!
- val bluePrintWorkFlowService = ImperativeBluePrintWorkflowService(NodeTemplateExecutionService(mockk()))
- val imperativeWorkflowExecutionService = ImperativeWorkflowExecutionService(bluePrintWorkFlowService)
+ val imperativeWorkflowExecutionService = ImperativeWorkflowExecutionService(NodeTemplateExecutionService(mockk()))
val output = imperativeWorkflowExecutionService
.executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
assertNotNull(output, "failed to get imperative workflow output")
diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py
index 0c476b23e..7c9ef84c1 100644
--- a/ms/command-executor/src/main/python/command_executor_handler.py
+++ b/ms/command-executor/src/main/python/command_executor_handler.py
@@ -151,7 +151,12 @@ class CommandExecutorHandler():
if rc == 0:
return utils.build_ret_data(True, results=result, results_log=results_log)
else:
- err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id)
+ err_msg = ""
+ if len(results_log) > 0:
+ # get exception message
+ err_msg = "{} - {}".format(self.blueprint_id, results_log[-1:][0])
+ else:
+ err_msg = "{} - Process exited with return code {}".format(self.blueprint_id, rc)
return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
def install_packages(self, request, type, f, results):