aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt
blob: 613fc4a58f13a84ebc9c56300f69ba699db57d2b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
 * Copyright © 2018-2019 AT&T Intellectual Property.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster

import com.hazelcast.client.HazelcastClient
import com.hazelcast.client.config.ClientConfig
import com.hazelcast.client.config.YamlClientConfigBuilder
import com.hazelcast.cluster.Member
import com.hazelcast.cluster.MembershipEvent
import com.hazelcast.cluster.MembershipListener
import com.hazelcast.config.Config
import com.hazelcast.config.FileSystemYamlConfig
import com.hazelcast.config.MemberAttributeConfig
import com.hazelcast.core.Hazelcast
import com.hazelcast.core.HazelcastInstance
import com.hazelcast.cp.CPSubsystemManagementService
import com.hazelcast.cp.lock.FencedLock
import com.hazelcast.scheduledexecutor.IScheduledExecutorService
import com.hazelcast.topic.Message
import com.hazelcast.topic.MessageListener
import kotlinx.coroutines.delay
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessage
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import java.time.Duration
import java.util.UUID

import java.util.concurrent.TimeUnit

@Service
open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BlueprintClusterService {

    private val log = logger(HazelcastClusterService::class)
    lateinit var hazelcast: HazelcastInstance
    lateinit var cpSubsystemManagementService: CPSubsystemManagementService
    var joinedClient = false
    var joinedLite = false

    override suspend fun <T> startCluster(configuration: T) {
        /** Get the Hazelcast Client or Server instance */
        hazelcast =
            when (configuration) {
                is Config -> {
                    joinedLite = configuration.isLiteMember
                    val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration)
                    /** Promote as CP Member */
                    promoteAsCPMember(hazelcastInstance)
                    hazelcastInstance
                }
                is ClientConfig -> {
                    joinedClient = true
                    HazelcastClient.newHazelcastClient(configuration)
                }
                is ClusterInfo -> {

                    System.setProperty(BlueprintConstants.PROPERTY_CLUSTER_ID, configuration.id)
                    System.setProperty(BlueprintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)

                    val memberAttributeConfig = MemberAttributeConfig()
                    memberAttributeConfig.setAttribute(
                        BlueprintConstants.PROPERTY_CLUSTER_NODE_ID,
                        configuration.nodeId
                    )

                    val configFile = configuration.configFile

                    /** Check file exists */
                    val clusterConfigFile = normalizedFile(configuration.configFile)
                    check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
                        "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
                    }
                    check(clusterConfigFile.exists()) {
                        "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
                    }
                    log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")

                    /** Hazelcast Client from config file */
                    if (configuration.joinAsClient) {
                        /** Set the configuration file to system properties, so that Hazelcast will read automatically */
                        System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
                        joinedClient = true
                        val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
                        hazelcastClientConfiguration.properties = configuration.properties
                        HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
                    } else {
                        /** Hazelcast Server from config file */
                        val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
                        hazelcastServerConfiguration.clusterName = configuration.id
                        hazelcastServerConfiguration.instanceName = configuration.nodeId
                        hazelcastServerConfiguration.properties = configuration.properties
                        hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
                        joinedLite = hazelcastServerConfiguration.isLiteMember
                        val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
                        /** Promote as CP Member */
                        promoteAsCPMember(hazelcastInstance)
                        hazelcastInstance
                    }
                }
                else -> {
                    throw BlueprintProcessorException("couldn't understand the cluster configuration")
                }
            }

        /** Add the Membership Listeners */
        hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener())
        log.info(
            "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
        )
        applicationEventPublisher.publishEvent(ClusterJoinedEvent(this))
    }

    override fun isClient(): Boolean {
        return joinedClient
    }

    override fun isLiteMember(): Boolean {
        return joinedLite
    }

    override fun clusterJoined(): Boolean {
        return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
    }

    override suspend fun masterMember(partitionGroup: String): ClusterMember {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return hazelcast.cluster.members.first().toClusterMember()
    }

    override suspend fun allMembers(): Set<ClusterMember> {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
    }

    override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
    }

    override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return hazelcast.getMap<String, T>(name)
    }

    /**
     * The DistributedLock is a distributed implementation of Java’s Lock.
     * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
     * determine ordering of multiple concurrent lock holders.
     * DistributedLocks are designed to account for failures within the cluster.
     * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
     * the lock will be released and granted to the next waiting process.
     */
    override suspend fun clusterLock(name: String): ClusterLock {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return ClusterLockImpl(hazelcast, name)
    }

    /** Return interface may change and it will be included in BlueprintClusterService */
    @UseExperimental
    suspend fun clusterScheduler(name: String): IScheduledExecutorService {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        return hazelcast.getScheduledExecutorService(name)
    }

    override suspend fun shutDown(duration: Duration) {
        if (::hazelcast.isInitialized && clusterJoined()) {
            delay(duration.toMillis())
            HazelcastClusterUtils.terminate(hazelcast)
        }
    }

    override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) {
        hazelcast.getReliableTopic<T>(topic.name).publish(message)
    }

    override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID {
        log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...")
        return hazelcast.getReliableTopic<T>(topic.name)
            .addMessageListener(HazelcastMessageListenerAdapter(listener))
    }

    override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean {
        log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...")
        return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid)
    }

    /** Utils */
    suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) {
        if (!joinedClient && !joinedLite) {
            HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance)
        }
    }

    suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        check(!isClient()) { "not supported for cluster client members." }
        return hazelcastApplicationMembers(ClusterUtils.applicationName())
    }

    suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
        check(::hazelcast.isInitialized) { "failed to start and join cluster" }
        val applicationMembers: MutableMap<String, Member> = hashMapOf()
        hazelcast.cluster.members.map { member ->
            val memberName: String = member.getAttribute(BlueprintConstants.PROPERTY_CLUSTER_NODE_ID)
            if (memberName.startsWith(appName, true)) {
                applicationMembers[memberName] = member
            }
        }
        return applicationMembers
    }
}

open class BlueprintsClusterMembershipListener() :
    MembershipListener {

    private val log = logger(BlueprintsClusterMembershipListener::class)

    override fun memberRemoved(membershipEvent: MembershipEvent) {
        log.info("MembershipEvent: $membershipEvent")
    }

    override fun memberAdded(membershipEvent: MembershipEvent) {
        log.info("MembershipEvent: $membershipEvent")
    }
}

open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {

    private val log = logger(ClusterLockImpl::class)

    private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)

    override fun name(): String {
        return distributedLock.name
    }

    override suspend fun lock() {
        distributedLock.lock()
        log.trace("Cluster lock($name) created..")
    }

    override suspend fun tryLock(timeout: Long): Boolean {
        return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
            .also {
                if (it) log.trace("Cluster lock acquired: $name")
                else log.trace("Failed to acquire Cluster lock $name within timeout $timeout")
            }
    }

    override suspend fun unLock() {
        distributedLock.unlock()
        log.trace("Cluster unlock(${name()}) successfully..")
    }

    override fun isLocked(): Boolean {
        return distributedLock.isLocked
    }

    override fun isLockedByCurrentThread(): Boolean {
        return distributedLock.isLockedByCurrentThread
    }

    override suspend fun fenceLock(): String {
        val fence = distributedLock.lockAndGetFence()
        log.trace("Cluster lock($name) fence($fence) created..")
        return fence.toString()
    }

    override suspend fun tryFenceLock(timeout: Long): String {
        return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
    }

    override fun close() {
    }
}

class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> {
    override fun onMessage(message: Message<E>?) = message?.let {
        BlueprintClusterMessage<E>(
            BlueprintClusterTopic.valueOf(it.source as String),
            it.messageObject,
            it.publishTime,
            it.publishingMember.toClusterMember()
        )
    }.let { listener.onMessage(it) }
}