From 55e4780ba5f1de52060eaf76608a588b5be7c788 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Wed, 2 Dec 2020 16:25:08 -0500 Subject: Fixed NoClassDefFoundError when USE_SCRIPT_COMPILE_CACHE is set to false USE_SCRIPT_COMPILE_CACHE set to false cleans the Class Loader cache after each kotlin script execution. When several kotlin script are executed in parallel (ie no dependency between them) and USE_SCRIPT_COMPILE_CACHE=false then the class loader from the cache may be deleted before one of those executed kotlin script get the time to finish which to the NoClassDefFoundError. Removed cleanupInstance method for kotlin script executors that where causing the class loader to be removed prematurely. Now the behaviour is to remove the class loader from the cache only when we publish a new CBA which was already the case when CDS run with a single instance. In cluster mode, a topic has been added to hazelcast to allow the instance publishing the updated CBA to communicate to the other instances by sending a message to clean the class loader for this CBA from their cache. Added mutex on kotlin script compilation to fix race condition. For concurrent kotlin script execution each process wanted to compile an executable but it was causing a race condition if a process tries to execute while another still compile. Mutex on the execution path prevent this behaviour Issue-ID: CCSDK-3052 Signed-off-by: Julien Fontaine Signed-off-by: Jozsef Csongvai Change-Id: I6ab002352b3272898ad0b183341ee664652c8ae3 --- .../netconf/executor/ComponentNetconfExecutor.kt | 5 --- .../CapabilityResourceResolutionProcessor.kt | 2 - .../IpAssignResolutionCapabilityTest.kt | 4 -- .../capabilities/NamingResolutionCapabilityTest.kt | 6 +-- .../CapabilityResourceResolutionProcessorTest.kt | 8 ---- .../core/BluePrintConstants.kt | 2 - .../core/interfaces/BluePrintScriptsService.kt | 2 - .../core/scripts/BluePrintCompileService.kt | 14 ++++++- .../core/scripts/BluePrintScriptsServiceImpl.kt | 7 ---- .../BlueprintProcessorCatalogServiceImpl.kt | 17 +++++++- .../core/cluster/BlueprintClusterTopic.kt | 21 ++++++++++ .../core/cluster/HazelcastClusterService.kt | 37 ++++++++++++++++- .../BlueprintCompilerCacheMessageListener.kt | 47 ++++++++++++++++++++++ .../core/service/BluePrintClusterService.kt | 20 +++++++++ .../core/cluster/HazelcastClusterServiceTest.kt | 40 +++++++++++++++++- .../execution/ComponentFunctionScriptingService.kt | 6 --- .../services/execution/ComponentScriptExecutor.kt | 2 - 17 files changed, 192 insertions(+), 48 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt index b64790065..49b9207db 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/ComponentNetconfExecutor.kt @@ -66,11 +66,6 @@ open class ComponentNetconfExecutor(private var componentFunctionScriptingServic // Handles both script processing and error handling scriptComponent.executeScript(executionServiceInput) - - componentFunctionScriptingService.cleanupInstance( - bluePrintRuntimeService.bluePrintContext(), - scriptType - ) } override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt index b1dc14091..1a557f5bc 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessor.kt @@ -83,8 +83,6 @@ open class CapabilityResourceResolutionProcessor(private var componentFunctionSc // Invoke componentResourceAssignmentProcessor componentResourceAssignmentProcessor!!.executeScript(resourceAssignment) - - componentFunctionScriptingService.cleanupInstance(raRuntimeService.bluePrintContext(), scriptType) } } diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt index 8a954c130..4ce5df18f 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/IpAssignResolutionCapabilityTest.kt @@ -86,10 +86,6 @@ class IpAssignResolutionCapabilityTest { .scriptInstance(any(), any(), any()) } returns IpAssignResolutionCapability() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk() every { raRuntimeService.bluePrintContext() } returns mockk() every { raRuntimeService.getInputValue("fixed_ipv4_Address_01") } returns NullNode.getInstance() diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt index 449845f7f..8c0aca49e 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/capabilities/NamingResolutionCapabilityTest.kt @@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.util import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ComponentFunctionScriptingService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintError import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType @@ -87,11 +88,8 @@ class NamingResolutionCapabilityTest { .scriptInstance(any(), any(), any()) } returns NamingResolutionCapability() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk() + every { raRuntimeService.getBluePrintError() } returns BluePrintError() every { raRuntimeService.bluePrintContext() } returns mockk() every { raRuntimeService.getInputValue("vf-module-name") } returns NullNode.getInstance() every { raRuntimeService.getInputValue("vnfc-name") } returns NullNode.getInstance() diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt index f618b41db..1b0058b90 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/processor/CapabilityResourceResolutionProcessorTest.kt @@ -52,10 +52,6 @@ class CapabilityResourceResolutionProcessorTest { .scriptInstance(any(), any(), any()) } returns MockCapabilityScriptRA() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val raRuntimeService = mockk() every { raRuntimeService.bluePrintContext() } returns mockk() every { raRuntimeService.getInputValue("test-property") } returns NullNode.getInstance() @@ -100,10 +96,6 @@ class CapabilityResourceResolutionProcessorTest { .scriptInstance(any(), BluePrintConstants.SCRIPT_JYTHON, any()) } returns MockCapabilityScriptRA() - coEvery { - componentFunctionScriptingService.cleanupInstance(any(), any()) - } returns mockk() - val resourceAssignmentRuntimeService = ResourceAssignmentRuntimeService("1234", bluePrintContext) val capabilityResourceResolutionProcessor = diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 351cf4776..9439c2d98 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -235,8 +235,6 @@ object BluePrintConstants { const val TOSCA_SPEC = "TOSCA" - val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean() - const val LOG_PROTECT: String = "log-protect" /** Cluster Properties */ diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt index 0f7cb79f2..aa61b0c4d 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/interfaces/BluePrintScriptsService.kt @@ -36,6 +36,4 @@ interface BluePrintScriptsService { suspend fun scriptInstance(cacheKey: String, scriptClassName: String): T suspend fun scriptInstance(scriptClassName: String): T - - suspend fun cleanupInstance(blueprintBasePath: String) } diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt index d1b42ffe5..230097f3c 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintCompileService.kt @@ -16,8 +16,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.scripts +import com.google.common.cache.CacheBuilder +import com.google.common.cache.CacheLoader +import com.google.common.cache.LoadingCache import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.jetbrains.kotlin.cli.common.ExitCode import org.jetbrains.kotlin.cli.common.arguments.parseCommandLineArguments import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation @@ -44,6 +49,8 @@ open class BluePrintCompileService { val classPaths = classpathFromClasspathProperty()?.joinToString(File.pathSeparator) { it.absolutePath } + val mutexCache: LoadingCache = CacheBuilder.newBuilder() + .build(CacheLoader.from { s -> Mutex() }) } /** Compile the [bluePrintSourceCode] and get the [kClassName] instance for the constructor [args] */ @@ -54,8 +61,11 @@ open class BluePrintCompileService { ): T { /** Compile the source code if needed */ log.debug("Jar Exists : ${bluePrintSourceCode.targetJarFile.exists()}, Regenerate : ${bluePrintSourceCode.regenerate}") - if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) { - compile(bluePrintSourceCode) + + mutexCache.get(bluePrintSourceCode.targetJarFile.absolutePath).withLock { + if (!bluePrintSourceCode.targetJarFile.exists() || bluePrintSourceCode.regenerate) { + compile(bluePrintSourceCode) + } } val classLoaderWithDependencies = BluePrintCompileCache.classLoader(bluePrintSourceCode.cacheKey) diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt index fa8ca2719..f3eb1a2b9 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/scripts/BluePrintScriptsServiceImpl.kt @@ -79,11 +79,4 @@ open class BluePrintScriptsServiceImpl : BluePrintScriptsService { return Thread.currentThread().contextClassLoader.loadClass(scriptClassName).constructors .single().newInstance(*args.toArray()) as T } - - override suspend fun cleanupInstance(blueprintBasePath: String) { - if (!BluePrintConstants.USE_SCRIPT_COMPILE_CACHE) { - log.info("Invalidating compile cache for blueprint ($blueprintBasePath)") - BluePrintCompileCache.cleanClassLoader(blueprintBasePath) - } - } } diff --git a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt index 1b58bc082..6637c62ec 100755 --- a/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/commons/db-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/db/primary/service/BlueprintProcessorCatalogServiceImpl.kt @@ -18,6 +18,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.db.primary.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModel import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelContent import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.repository.BlueprintModelRepository @@ -34,6 +36,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName import org.onap.ccsdk.cds.controllerblueprints.core.reCreateNBDirs import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils import org.slf4j.LoggerFactory import org.springframework.dao.DataIntegrityViolationException @@ -60,7 +63,7 @@ class BlueprintProcessorCatalogServiceImpl( // Clean blueprint script cache val cacheKey = BluePrintFileUtils .compileCacheKey(normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, name, version)) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) log.info("removed cba file name($name), version($version) from cache") // Cleaning Deployed Blueprint deleteNBDir(bluePrintLoadConfiguration.blueprintDeployPath, name, version) @@ -132,7 +135,7 @@ class BlueprintProcessorCatalogServiceImpl( normalizedPathName(bluePrintLoadConfiguration.blueprintDeployPath, artifactName, artifactVersion) val cacheKey = BluePrintFileUtils.compileCacheKey(deployFile) - BluePrintCompileCache.cleanClassLoader(cacheKey) + cleanClassLoader(cacheKey) deleteNBDir(deployFile).let { if (it) log.info("Deleted deployed blueprint model :$artifactName::$artifactVersion") @@ -174,4 +177,14 @@ class BlueprintProcessorCatalogServiceImpl( ) } } + + private suspend fun cleanClassLoader(cacheKey: String) { + val clusterService = BluePrintDependencyService.optionalClusterService() + if (null == clusterService) + BluePrintCompileCache.cleanClassLoader(cacheKey) + else { + log.info("Sending ClusterMessage: Clean Classloader Cache") + clusterService.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, cacheKey) + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt new file mode 100644 index 000000000..090130f98 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BlueprintClusterTopic.kt @@ -0,0 +1,21 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +enum class BlueprintClusterTopic { + BLUEPRINT_CLEAN_COMPILER_CACHE +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt index 5493bac1a..fb9056776 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -30,9 +30,14 @@ import com.hazelcast.core.HazelcastInstance import com.hazelcast.cp.CPSubsystemManagementService import com.hazelcast.cp.lock.FencedLock import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import com.hazelcast.topic.Message +import com.hazelcast.topic.MessageListener import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants @@ -40,12 +45,15 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Duration +import java.util.UUID + import java.util.concurrent.TimeUnit @Service -open class HazelcastClusterService : BluePrintClusterService { +open class HazelcastClusterService(private val applicationEventPublisher: ApplicationEventPublisher) : BluePrintClusterService { private val log = logger(HazelcastClusterService::class) lateinit var hazelcast: HazelcastInstance @@ -123,6 +131,7 @@ open class HazelcastClusterService : BluePrintClusterService { log.info( "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." ) + applicationEventPublisher.publishEvent(ClusterJoinedEvent(this)) } override fun isClient(): Boolean { @@ -184,6 +193,21 @@ open class HazelcastClusterService : BluePrintClusterService { } } + override suspend fun sendMessage(topic: BlueprintClusterTopic, message: T) { + hazelcast.getReliableTopic(topic.name).publish(message) + } + + override fun addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener): UUID { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) listening to topic($topic)...") + return hazelcast.getReliableTopic(topic.name) + .addMessageListener(HazelcastMessageListenerAdapter(listener)) + } + + override fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean { + log.info("Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) has stopped listening to topic($topic)...") + return hazelcast.getReliableTopic(topic.name).removeMessageListener(uuid) + } + /** Utils */ suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { if (!joinedClient && !joinedLite) { @@ -273,3 +297,14 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val override fun close() { } } + +class HazelcastMessageListenerAdapter(val listener: BlueprintClusterMessageListener) : MessageListener { + override fun onMessage(message: Message?) = message?.let { + BluePrintClusterMessage( + BlueprintClusterTopic.valueOf(it.source as String), + it.messageObject, + it.publishTime, + it.publishingMember.toClusterMember() + ) + }.let { listener.onMessage(it) } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt new file mode 100644 index 000000000..3833379c9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/listeners/BlueprintCompilerCacheMessageListener.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2020 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.listeners + +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterJoinedEvent +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Component + +@Component +@ConditionalOnProperty("CLUSTER_ENABLED", havingValue = "true") +open class BlueprintCompilerCacheMessageListener(private val clusterService: BluePrintClusterService) : BlueprintClusterMessageListener { + private val log = logger(BlueprintCompilerCacheMessageListener::class) + + @EventListener(ClusterJoinedEvent::class) + fun register() { + log.info("Registering BlueprintCompilerCacheMessageListener") + clusterService.addBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, this) + } + + override fun onMessage(message: BluePrintClusterMessage?) { + message?.let { + log.info("Received ClusterMessage - Cleaning compile cache for blueprint (${it.payload})") + BluePrintCompileCache.cleanClassLoader(it.payload) + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 8cb9f4f92..f7ba6f25f 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -16,8 +16,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.BlueprintClusterTopic +import org.springframework.context.ApplicationEvent import java.time.Duration import java.util.Properties +import java.util.UUID interface BluePrintClusterService { @@ -53,6 +56,15 @@ interface BluePrintClusterService { /** Shut down the cluster with [duration] */ suspend fun shutDown(duration: Duration) + + /** Send [message] to the listener(s) of a [topic] */ + suspend fun sendMessage(topic: BlueprintClusterTopic, message: T) + + /** Register a [listener] to a [topic] and returns his UUID */ + fun addBlueprintClusterMessageListener(topic: BlueprintClusterTopic, listener: BlueprintClusterMessageListener): UUID + + /** Unregister a listener from a [topic] using his [uuid] and returns true if it succeeded */ + fun removeBlueprintClusterMessageListener(topic: BlueprintClusterTopic, uuid: UUID): Boolean } data class ClusterInfo( @@ -83,4 +95,12 @@ interface ClusterLock { fun close() } +class BluePrintClusterMessage(val topic: BlueprintClusterTopic, val payload: E, publishTime: Long, clusterMember: ClusterMember) + +interface BlueprintClusterMessageListener { + fun onMessage(message: BluePrintClusterMessage?) +} + +class ClusterJoinedEvent(source: Any) : ApplicationEvent(source) + const val CDS_LOCK_GROUP = "cds-lock" diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt index da5564235..ded017940 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -22,6 +22,7 @@ import com.hazelcast.cluster.Member import com.hazelcast.config.FileSystemYamlConfig import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap +import io.mockk.mockk import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -32,7 +33,9 @@ import kotlinx.coroutines.withContext import org.junit.After import org.junit.Before import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterMessage import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BlueprintClusterMessageListener import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive @@ -41,6 +44,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import java.io.Serializable import java.util.Properties import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -91,6 +95,40 @@ class HazelcastClusterServiceTest { } } + @Test + fun testClusterMessaging() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(1, 2, 3)).toMutableList() + printReachableMembers(bluePrintClusterServiceOne) + testMessageReceived(bluePrintClusterServiceOne) + } + } + + private suspend fun testMessageReceived(bluePrintClusterServices: List) { + val sender = bluePrintClusterServices[0] as HazelcastClusterService + val receiver = bluePrintClusterServices[1] as HazelcastClusterService + val messageSent = "hello world" + var isMessageReceived = false + val uuid = receiver.addBlueprintClusterMessageListener( + BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, + object : BlueprintClusterMessageListener { + override fun onMessage(message: BluePrintClusterMessage?) { + log.info("Message received - ${message?.payload}") + isMessageReceived = messageSent == message?.payload + } + } + ) + + assertNotNull(uuid) + sender.sendMessage(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, messageSent) + delay(1000) + assertTrue(isMessageReceived) + + assertTrue(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + assertFalse(receiver.removeBlueprintClusterMessageListener(BlueprintClusterTopic.BLUEPRINT_CLEAN_COMPILER_CACHE, uuid)) + } + private suspend fun createCluster( ids: List, joinAsClient: Boolean? = false @@ -117,7 +155,7 @@ class HazelcastClusterServiceTest { properties = properties ) } - val hazelcastClusterService = HazelcastClusterService() + val hazelcastClusterService = HazelcastClusterService(mockk(relaxed = true)) hazelcastClusterService.startCluster(clusterInfo) hazelcastClusterService } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt index d107f01e5..3483ce1d7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt @@ -101,10 +101,4 @@ class ComponentFunctionScriptingService( } return scriptComponent } - - suspend fun cleanupInstance(bluePrintContext: BluePrintContext, scriptType: String) { - if (scriptType == BluePrintConstants.SCRIPT_KOTLIN) { - BluePrintScriptsServiceImpl().cleanupInstance(bluePrintContext.rootPath) - } - } } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt index 34eaf6226..a7ef0a8a0 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentScriptExecutor.kt @@ -61,8 +61,6 @@ open class ComponentScriptExecutor(private var componentFunctionScriptingService // Handles both script processing and error handling scriptComponentFunction.executeScript(executionServiceInput) - - componentFunctionScriptingService.cleanupInstance(bluePrintRuntimeService.bluePrintContext(), scriptType) } override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) { -- cgit 1.2.3-korg