diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
6 files changed, 178 insertions, 4 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt index 1b58bc082..6637c62ec 100755 --- a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt @@ -18,6 +18,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository @@ -34,6 +36,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils import org.slf4j.LoggerFactory import org.springframework.dao.DataIntegrityViolationException @@ -60,7 +63,7 @@ class BlueprintProcessorCatalogServiceImpl( // Clean blueprint script cache val cacheKey = BluePrintFileUtils .compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version)) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) log.info("removed cba file name($name), version($version) from cache") // Cleaning Deployed Blueprint deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version) @@ -132,7 +135,7 @@ class BlueprintProcessorCatalogServiceImpl( normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion) val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) deleteNBDir(deployFile).let { if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion") @@ -174,4 +177,14 @@ class BlueprintProcessorCatalogServiceImpl( ) } } + + private suspend fun cleanClassLoader(cacheKey: String) { + val clusterService = BluePrintDependencyService.optionalClusterService() + if (null == clusterService) + BluePrintCompileCache.cleanClassLoader(cacheKey) + else { + log.info("Sending ClusterMessage: Clean Classloader Cache") + clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt new file mode 100644 index 000000000..090130f98 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt @@ -0,0 +1,21 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +enum class BlueprintClusterTopic { + BLUEPRINT_CLEAN_COMPILER_CACHE +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt index 5493bac1a..fb9056776 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance import com.hazelcast.cp.CPSubsystemManagementService import com.hazelcast.cp.lock.FencedLock import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import com.hazelcast.topic.Message +import com.hazelcast.topic.MessageListener import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants @@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Duration +import java.util.UUID + import java.util.concurrent.TimeUnit @Service -open class HazelcastClusterService : BluePrintClusterService { +open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService { private val log = logger(HazelcastClusterService::class) lateinit var hazelcast: HazelcastInstance @@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService { log.info( "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." ) + applicationEventPublisher.publishEvent(ClusterJoinedEvent(this)) } override fun isClient(): Boolean { @@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService { } } + override suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) { + hazelcast.getReliableTopic<T>(topic.name).publish(message) + } + + override fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...") + return hazelcast.getReliableTopic<T>(topic.name) + .addMessageListener(HazelcastMessageListenerAdapter(listener)) + } + + override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...") + return hazelcast.getReliableTopic<Any>(topic.name).removeMessageListener(uuid) + } + /** Utils */ suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { if (!joinedClient && !joinedLite) { @@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val override fun close() { } } + +class HazelcastMessageListenerAdapter<E>(val listener: BlueprintClusterMessageListener<E>) : MessageListener<E> { + override fun onMessage(message: Message<E>?) = message?.let { + BluePrintClusterMessage<E>( + BlueprintClusterTopic.valueOf(it.source as String), + it.messageObject, + it.publishTime, + it.publishingMember.toClusterMember() + ) + }.let { listener.onMessage(it) } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt new file mode 100644 index 000000000..3833379c9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners + +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component + +@Component +@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true") +open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener<String> { + private val log = logger(BlueprintCompilerCacheMessageListener::class) + + @EventListener(ClusterJoinedEvent::class) + fun register() { + log.info("Registering BlueprintCompilerCacheMessageListener") + clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this) + } + + override fun onMessage(message: BluePrintClusterMessage<String>?) { + message?.let { + log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})") + BluePrintCompileCache.cleanClassLoader(it.payload) + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 8cb9f4f92..f7ba6f25f 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -16,8 +16,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.springframework.context.ApplicationEvent import java.time.Duration import java.util.Properties +import java.util.UUID interface BluePrintClusterService { @@ -53,6 +56,15 @@ interface BluePrintClusterService { /** Shut down the cluster with [duration] */ suspend fun shutDown(duration: Duration) + + /** Send [message] to the listener(s) of a [topic] */ + suspend fun <T> sendMessage(topic: BlueprintClusterTopic, message: T) + + /** Register a [listener] to a [topic] and returns his UUID */ + fun <T> addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener<T>): UUID + + /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */ + fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean } data class ClusterInfo( @@ -83,4 +95,12 @@ interface ClusterLock { fun close() } +class BluePrintClusterMessage<E>(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember) + +interface BlueprintClusterMessageListener<E> { + fun onMessage(message: BluePrintClusterMessage<E>?) +} + +class ClusterJoinedEvent(source: Any) : ApplicationEvent(source) + const val CDS_LOCK_GROUP = "cds-lock" diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt index da5564235..ded017940 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member import com.hazelcast.config.FileSystemYamlConfig import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap +import io.mockk.mockk import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext import org.junit.After import org.junit.Before import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive @@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import java.io.Serializable import java.util.Properties import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -91,6 +95,40 @@ class HazelcastClusterServiceTest { } } + @Test + fun testClusterMessaging() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(1, 2, 3)).toMutableList() + printReachableMembers(bluePrintClusterServiceOne) + testMessageReceived(bluePrintClusterServiceOne) + } + } + + private suspend fun testMessageReceived(bluePrintClusterServices: List<BluePrintClusterService>) { + val sender = bluePrintClusterServices[0] as HazelcastClusterService + val receiver = bluePrintClusterServices[1] as HazelcastClusterService + val messageSent = "hello world" + var isMessageReceived = false + val uuid = receiver.addBlueprintClusterMessageListener( + BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, + object : BlueprintClusterMessageListener<String> { + override fun onMessage(message: BluePrintClusterMessage<String>?) { + log.info("Message received - ${message?.payload}") + isMessageReceived = messageSent == message?.payload + } + } + ) + + assertNotNull(uuid) + sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent) + delay(1000) + assertTrue(isMessageReceived) + + assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + } + private suspend fun createCluster( ids: List<Int>, joinAsClient: Boolean? = false @@ -117,7 +155,7 @@ class HazelcastClusterServiceTest { properties = properties ) } - val hazelcastClusterService = HazelcastClusterService() + val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true)) hazelcastClusterService.startCluster(clusterInfo) hazelcastClusterService } |