diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules')
9 files changed, 158 insertions, 10 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt index a112b6e6c..7c0970202 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt @@ -231,15 +231,17 @@ class BluePrintContext(val serviceTemplate: ServiceTemplate) { return nodeTemplateByName(nodeTemplateName).artifacts } - fun nodeTemplateArtifact(nodeTemplateName: String, artifactName: String): ArtifactDefinition { + fun checkNodeTemplateArtifact(nodeTemplateName: String, artifactName: String): ArtifactDefinition? { return nodeTemplateArtifacts(nodeTemplateName)?.get(artifactName) + } + + fun nodeTemplateArtifact(nodeTemplateName: String, artifactName: String): ArtifactDefinition { + return checkNodeTemplateArtifact(nodeTemplateName, artifactName) ?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s ArtifactDefinition($artifactName)") } fun nodeTemplateArtifactForArtifactType(nodeTemplateName: String, artifactType: String): ArtifactDefinition { - return nodeTemplateArtifacts(nodeTemplateName)?.filter { it.value.type == artifactType }?.map { it.value }?.get( - 0 - ) + return nodeTemplateArtifacts(nodeTemplateName)?.filter { it.value.type == artifactType }?.map { it.value }?.get(0) ?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s Artifact Type($artifactType)") } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt index 83a04d653..09641458c 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -27,6 +27,7 @@ import com.hazelcast.config.FileSystemYamlConfig import com.hazelcast.config.MemberAttributeConfig import com.hazelcast.core.Hazelcast import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.CPSubsystemManagementService import com.hazelcast.cp.lock.FencedLock import com.hazelcast.scheduledexecutor.IScheduledExecutorService import kotlinx.coroutines.delay @@ -48,6 +49,7 @@ open class HazlecastClusterService : BluePrintClusterService { private val log = logger(HazlecastClusterService::class) lateinit var hazelcast: HazelcastInstance + lateinit var cpSubsystemManagementService: CPSubsystemManagementService var joinedClient = false var joinedLite = false @@ -57,7 +59,10 @@ open class HazlecastClusterService : BluePrintClusterService { when (configuration) { is Config -> { joinedLite = configuration.isLiteMember - Hazelcast.newHazelcastInstance(configuration) + val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration) + /** Promote as CP Member */ + promoteAsCPMember(hazelcastInstance) + hazelcastInstance } is ClientConfig -> { joinedClient = true @@ -99,7 +104,10 @@ open class HazlecastClusterService : BluePrintClusterService { hazelcastServerConfiguration.properties = configuration.properties hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig joinedLite = hazelcastServerConfiguration.isLiteMember - Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + /** Promote as CP Member */ + promoteAsCPMember(hazelcastInstance) + hazelcastInstance } } else -> { @@ -169,11 +177,17 @@ open class HazlecastClusterService : BluePrintClusterService { override suspend fun shutDown(duration: Duration) { if (::hazelcast.isInitialized && clusterJoined()) { delay(duration.toMillis()) - hazelcast.lifecycleService.terminate() + HazlecastClusterUtils.terminate(hazelcast) } } /** Utils */ + suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { + if (!joinedClient && !joinedLite) { + HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance) + } + } + suspend fun myHazelcastApplicationMembers(): Map<String, Member> { check(::hazelcast.isInitialized) { "failed to start and join cluster" } check(!isClient()) { "not supported for cluster client members." } @@ -198,11 +212,11 @@ open class BlueprintsClusterMembershipListener(val hazlecastClusterService: Hazl private val log = logger(BlueprintsClusterMembershipListener::class) override fun memberRemoved(membershipEvent: MembershipEvent) { - log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent") + log.info("MembershipEvent: $membershipEvent") } override fun memberAdded(membershipEvent: MembershipEvent) { - log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent") + log.info("MembershipEvent: $membershipEvent") } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt new file mode 100644 index 000000000..70970f6da --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt @@ -0,0 +1,117 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.CPGroup +import com.hazelcast.cp.CPMember +import com.hazelcast.cp.CPSubsystemManagementService +import com.hazelcast.instance.impl.HazelcastInstanceProxy +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.UUID +import java.util.concurrent.TimeUnit + +object HazlecastClusterUtils { + + private val log = logger(HazlecastClusterUtils::class) + + /** Promote [hazelcastInstance] member to CP Member */ + fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { + when (hazelcastInstance) { + is HazelcastInstanceProxy -> { + val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) + cpSubsystemManagementService.promoteToCPMember() + .toCompletableFuture().get() + log.info("Promoted as CP member(${hazelcastInstance.cluster.localMember})") + val clusterCPMembers = clusterCPMembers(hazelcastInstance) + log.info("CP Members(${clusterCPMembers.size}): $clusterCPMembers") + val cpGroupMembers = cpGroupMembers(hazelcastInstance) + log.info("CP Group Members(${cpGroupMembers.size}): $cpGroupMembers") + } + else -> log.debug("Client instance not eligible for CP Member promotion") + } + } + + /** Terminate [hazelcastInstance] member */ + fun terminate(hazelcastInstance: HazelcastInstance) { + log.info("Terminating Member : ${hazelcastInstance.cluster.localMember}") + hazelcastInstance.lifecycleService.terminate() + } + + /** Remove [hazelcastInstance] member from cluster CP Member List*/ + fun removeFromCPMember(hazelcastInstance: HazelcastInstance) { + // check CP Member, then remove */ + val localCPMemberUuid = localCPMemberUUID(hazelcastInstance) + localCPMemberUuid?.let { uuid -> + removeFromCPMember(hazelcastInstance, uuid) + } + } + + /** Remove [removeCPMemberUuid] member from cluster CP Member List, using [hazelcastInstance]*/ + fun removeFromCPMember(hazelcastInstance: HazelcastInstance, removeCPMemberUuid: UUID) { + val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) + cpSubsystemManagementService.removeCPMember(removeCPMemberUuid) + .toCompletableFuture().get() + log.info("Removed CP member($removeCPMemberUuid)") + } + + /** Get [hazelcastInstance] CP Group members*/ + fun cpGroupMembers(hazelcastInstance: HazelcastInstance): List<CPMember> { + return cpGroup(hazelcastInstance).members().toList() + } + + /** Get [hazelcastInstance] CP Group[groupName] members*/ + fun cpGroup( + hazelcastInstance: HazelcastInstance, + groupName: String? = CPGroup.METADATA_CP_GROUP_NAME + ): CPGroup { + return cpSubsystemManagementService(hazelcastInstance).getCPGroup(groupName) + .toCompletableFuture().get() + } + + /** Get [hazelcastInstance] CP member UUIDs*/ + fun clusterCPMemberUUIDs(hazelcastInstance: HazelcastInstance): List<UUID> { + return clusterCPMembers(hazelcastInstance).map { it.uuid } + } + + /** Get [hazelcastInstance] CP members*/ + fun clusterCPMembers(hazelcastInstance: HazelcastInstance): List<CPMember> { + return cpSubsystemManagementService(hazelcastInstance).cpMembers.toCompletableFuture().get().toList() + } + + /** Get CPSubsystemManagementService for [hazelcastInstance] */ + fun cpSubsystemManagementService(hazelcastInstance: HazelcastInstance): CPSubsystemManagementService { + val cpSubsystemManagementService = hazelcastInstance.cpSubsystem.cpSubsystemManagementService + cpSubsystemManagementService.awaitUntilDiscoveryCompleted(3, TimeUnit.MINUTES) + return cpSubsystemManagementService + } + + /** Get local CPMemberUUID for [hazelcastInstance] */ + fun localCPMemberUUID(hazelcastInstance: HazelcastInstance) = localCPMember(hazelcastInstance)?.uuid + + /** Check local member is CP member for [hazelcastInstance] */ + fun checkLocalMemberIsCPMember(hazelcastInstance: HazelcastInstance): Boolean { + return localCPMember(hazelcastInstance) != null + } + + /** Get local CP member for [hazelcastInstance] */ + fun localCPMember(hazelcastInstance: HazelcastInstance) = + cpSubsystemManagementService(hazelcastInstance).localCPMember + + /** Get local CP member UUID for [hazelcastInstance] */ + fun localMemberUUID(hazelcastInstance: HazelcastInstance) = hazelcastInstance.cluster.localMember.uuid +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml index cbf488c95..e7ac273ed 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml @@ -6,6 +6,9 @@ hazelcast: cp-subsystem: cp-member-count: 3 group-size: 3 + session-time-to-live-seconds: 60 + session-heartbeat-interval-seconds: 5 + missing-cp-member-auto-removal-seconds: 120 # network: # join: # multicast: diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml index 356be1d05..cb493d169 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml @@ -6,6 +6,9 @@ hazelcast: cp-subsystem: cp-member-count: 3 group-size: 3 + session-time-to-live-seconds: 60 + session-heartbeat-interval-seconds: 5 + missing-cp-member-auto-removal-seconds: 120 # network: # join: # multicast: diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml index d256f49e3..e60b0c506 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml @@ -6,6 +6,9 @@ hazelcast: cp-subsystem: cp-member-count: 3 group-size: 3 + session-time-to-live-seconds: 60 + session-heartbeat-interval-seconds: 5 + missing-cp-member-auto-removal-seconds: 120 # network: # join: # multicast: diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml index 9c7d566db..3cb10a08b 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml @@ -6,6 +6,9 @@ hazelcast: cp-subsystem: cp-member-count: 3 group-size: 3 + session-time-to-live-seconds: 60 + session-heartbeat-interval-seconds: 5 + missing-cp-member-auto-removal-seconds: 120 # network: # join: # multicast: diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml index dcecf454f..ab5d44804 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml @@ -6,6 +6,9 @@ hazelcast: cp-subsystem: cp-member-count: 3 group-size: 3 + session-time-to-live-seconds: 60 + session-heartbeat-interval-seconds: 5 + missing-cp-member-auto-removal-seconds: 120 # network: # join: # multicast: diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index ebd9d553d..06100f1fc 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -144,7 +144,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS nodeInput: ExecutionServiceInput, nodeOutput: ExecutionServiceOutput ): EdgeLabel { - log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})") + log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})") val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id) checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" } val nodeTemplateName = step.target!! |