From feb0214caf9d9dfcf3280c8c0f63137219aafabb Mon Sep 17 00:00:00 2001 From: "Muthuramalingam, Brinda Santh" Date: Fri, 29 Mar 2019 13:39:32 -0400 Subject: Improve bulk processing Change-Id: Ia8bd447563072284aa71e00eab0ef9b93e1776b1 Issue-ID: CCSDK-1127 Signed-off-by: Muthuramalingam, Brinda Santh --- .../resolution/ResourceResolutionService.kt | 137 +++++++++++---------- .../db/ResourceResolutionResultService.kt | 23 ++-- 2 files changed, 85 insertions(+), 75 deletions(-) (limited to 'ms/blueprintsprocessor/functions/resource-resolution/src/main') diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionService.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionService.kt index dad804692..8a14ded61 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionService.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/ResourceResolutionService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. - * Modifications Copyright © 2018 IBM. + * Modifications Copyright © 2018-2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.db.ResourceResolutionResultService import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.processor.ResourceAssignmentProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.utils.ResourceAssignmentUtils @@ -39,22 +41,22 @@ interface ResourceResolutionService { fun registeredResourceSources(): List - fun resolveFromDatabase(bluePrintRuntimeService: BluePrintRuntimeService<*>, artifactTemplate: String, - resolutionKey: String): String + suspend fun resolveFromDatabase(bluePrintRuntimeService: BluePrintRuntimeService<*>, artifactTemplate: String, + resolutionKey: String): String - fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactNames: List, properties: Map): MutableMap + suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactNames: List, properties: Map): MutableMap - fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactPrefix: String, properties: Map): String + suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactPrefix: String, properties: Map): String - fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactMapping: String, artifactTemplate: String?): String + suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactMapping: String, artifactTemplate: String?): String - fun resolveResourceAssignments(blueprintRuntimeService: BluePrintRuntimeService<*>, - resourceDictionaries: MutableMap, - resourceAssignments: MutableList, - identifierName: String) + suspend fun resolveResourceAssignments(blueprintRuntimeService: BluePrintRuntimeService<*>, + resourceDictionaries: MutableMap, + resourceAssignments: MutableList, + identifierName: String) } @Service(ResourceResolutionConstants.SERVICE_RESOURCE_RESOLUTION) @@ -70,9 +72,15 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica .map { it.substringAfter(ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR) } } - override fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactNames: List, - properties: Map): MutableMap { + override suspend fun resolveFromDatabase(bluePrintRuntimeService: BluePrintRuntimeService<*>, + artifactTemplate: String, + resolutionKey: String): String { + return resolutionResultService.read(bluePrintRuntimeService, artifactTemplate, resolutionKey) + } + + override suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactNames: List, + properties: Map): MutableMap { val resolvedParams: MutableMap = hashMapOf() artifactNames.forEach { artifactName -> @@ -82,8 +90,8 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica return resolvedParams } - override fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactPrefix: String, properties: Map): String { + override suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactPrefix: String, properties: Map): String { // Velocity Artifact Definition Name val artifactTemplate = "$artifactPrefix-template" @@ -95,20 +103,17 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica if (properties.containsKey(ResourceResolutionConstants.RESOURCE_RESOLUTION_INPUT_STORE_RESULT) && properties[ResourceResolutionConstants.RESOURCE_RESOLUTION_INPUT_STORE_RESULT] as Boolean) { resolutionResultService.write(properties, result, bluePrintRuntimeService, artifactPrefix) + log.info("resolution saved into database successfully : ($properties)") } return result } - override fun resolveFromDatabase(bluePrintRuntimeService: BluePrintRuntimeService<*>, artifactTemplate: String, - resolutionKey: String): String { - return resolutionResultService.read(bluePrintRuntimeService, artifactTemplate, resolutionKey) - } - override fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, - artifactMapping: String, artifactTemplate: String?): String { + override suspend fun resolveResources(bluePrintRuntimeService: BluePrintRuntimeService<*>, nodeTemplateName: String, + artifactMapping: String, artifactTemplate: String?): String { - var resolvedContent = "" + val resolvedContent: String log.info("Resolving resource for template artifact($artifactTemplate) with resource assignment artifact($artifactMapping)") val identifierName = artifactTemplate ?: "no-template" @@ -128,7 +133,6 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica val resourceDictionaries: MutableMap = JacksonUtils.getMapFromFile(dictionaryFile, ResourceDefinition::class.java) - ?: throw BluePrintProcessorException("couldn't get Dictionary Definitions") // Resolve resources resolveResourceAssignments(bluePrintRuntimeService, resourceDictionaries, resourceAssignments, identifierName) @@ -153,46 +157,51 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica * name, then get the type of the Resource Definition, Get the instance for the Resource Type and process the * request. */ - override fun resolveResourceAssignments(blueprintRuntimeService: BluePrintRuntimeService<*>, - resourceDictionaries: MutableMap, - resourceAssignments: MutableList, - identifierName: String) { + override suspend fun resolveResourceAssignments(blueprintRuntimeService: BluePrintRuntimeService<*>, + resourceDictionaries: MutableMap, + resourceAssignments: MutableList, + identifierName: String) { val bulkSequenced = BulkResourceSequencingUtils.process(resourceAssignments) val resourceAssignmentRuntimeService = ResourceAssignmentUtils.transformToRARuntimeService(blueprintRuntimeService, identifierName) - bulkSequenced.map { batchResourceAssignments -> - batchResourceAssignments.filter { it.name != "*" && it.name != "start" } - .forEach { resourceAssignment -> - val dictionaryName = resourceAssignment.dictionaryName - val dictionarySource = resourceAssignment.dictionarySource - /** - * Get the Processor name - */ - val processorName = processorName(dictionaryName!!, dictionarySource!!, resourceDictionaries) - - val resourceAssignmentProcessor = - applicationContext.getBean(processorName) as? ResourceAssignmentProcessor - ?: throw BluePrintProcessorException("failed to get resource processor for name($processorName) " + - "for resource assignment(${resourceAssignment.name})") - try { - // Set BluePrint Runtime Service - resourceAssignmentProcessor.raRuntimeService = resourceAssignmentRuntimeService - // Set Resource Dictionaries - resourceAssignmentProcessor.resourceDictionaries = resourceDictionaries - // TODO ("Implement suspend function") - runBlocking { - // Invoke Apply Method - resourceAssignmentProcessor.applyNB(resourceAssignment) + coroutineScope { + bulkSequenced.forEach { batchResourceAssignments -> + // Execute Non Dependent Assignments in parallel ( ie asynchronously ) + val deferred = batchResourceAssignments.filter { it.name != "*" && it.name != "start" } + .map { resourceAssignment -> + async { + val dictionaryName = resourceAssignment.dictionaryName + val dictionarySource = resourceAssignment.dictionarySource + /** + * Get the Processor name + */ + val processorName = processorName(dictionaryName!!, dictionarySource!!, resourceDictionaries) + + val resourceAssignmentProcessor = + applicationContext.getBean(processorName) as? ResourceAssignmentProcessor + ?: throw BluePrintProcessorException("failed to get resource processor ($processorName) " + + "for resource assignment(${resourceAssignment.name})") + try { + // Set BluePrint Runtime Service + resourceAssignmentProcessor.raRuntimeService = resourceAssignmentRuntimeService + // Set Resource Dictionaries + resourceAssignmentProcessor.resourceDictionaries = resourceDictionaries + // Invoke Apply Method + resourceAssignmentProcessor.applyNB(resourceAssignment) + // Set errors from RA + blueprintRuntimeService.setBluePrintError(resourceAssignmentRuntimeService.getBluePrintError()) + } catch (e: RuntimeException) { + throw BluePrintProcessorException(e) + } } - // Set errors from RA - blueprintRuntimeService.setBluePrintError(resourceAssignmentRuntimeService.getBluePrintError()) - } catch (e: RuntimeException) { - throw BluePrintProcessorException(e) } - } + log.debug("Resolving (${deferred.size})resources parallel.") + deferred.awaitAll() + } } + } @@ -202,13 +211,12 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica */ private fun processorName(dictionaryName: String, dictionarySource: String, resourceDictionaries: MutableMap): String { - var processorName: String? = null - when (dictionarySource) { + val processorName: String = when (dictionarySource) { "input" -> { - processorName = "${ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR}source-input" + "${ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR}source-input" } "default" -> { - processorName = "${ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR}source-default" + "${ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR}source-default" } else -> { val resourceDefinition = resourceDictionaries[dictionaryName] @@ -217,8 +225,7 @@ open class ResourceResolutionServiceImpl(private var applicationContext: Applica val resourceSource = resourceDefinition.sources[dictionarySource] ?: throw BluePrintProcessorException("couldn't get resource definition $dictionaryName source($dictionarySource)") - processorName = ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR - .plus(resourceSource.type) + ResourceResolutionConstants.PREFIX_RESOURCE_RESOLUTION_PROCESSOR.plus(resourceSource.type) } } checkNotEmptyOrThrow(processorName, diff --git a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/db/ResourceResolutionResultService.kt b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/db/ResourceResolutionResultService.kt index 225251fce..cbc68bbc9 100644 --- a/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/db/ResourceResolutionResultService.kt +++ b/ms/blueprintsprocessor/functions/resource-resolution/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/resource/resolution/db/ResourceResolutionResultService.kt @@ -1,5 +1,6 @@ /* * Copyright (C) 2019 Bell Canada. + * Modifications Copyright © 2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +16,8 @@ */ package org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.db +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException @@ -26,8 +29,8 @@ import java.util.* @Service class ResourceResolutionResultService(private val resourceResolutionRepository: ResourceResolutionRepository) { - fun write(properties: Map, result: String, bluePrintRuntimeService: BluePrintRuntimeService<*>, - artifactPrefix: String) { + suspend fun write(properties: Map, result: String, bluePrintRuntimeService: BluePrintRuntimeService<*>, + artifactPrefix: String) = withContext(Dispatchers.IO) { val metadata = bluePrintRuntimeService.bluePrintContext().metadata!! @@ -37,7 +40,7 @@ class ResourceResolutionResultService(private val resourceResolutionRepository: resourceResolutionResult.blueprintVersion = metadata[BluePrintConstants.METADATA_TEMPLATE_VERSION] resourceResolutionResult.blueprintName = metadata[BluePrintConstants.METADATA_TEMPLATE_NAME] resourceResolutionResult.resolutionKey = - properties[ResourceResolutionConstants.RESOURCE_RESOLUTION_INPUT_KEY].toString() + properties[ResourceResolutionConstants.RESOURCE_RESOLUTION_INPUT_KEY].toString() resourceResolutionResult.result = result try { @@ -47,18 +50,18 @@ class ResourceResolutionResultService(private val resourceResolutionRepository: } } - fun read(bluePrintRuntimeService: BluePrintRuntimeService<*>, artifactPrefix: String, - resolutionKey: String): String { + suspend fun read(bluePrintRuntimeService: BluePrintRuntimeService<*>, artifactPrefix: String, + resolutionKey: String): String = withContext(Dispatchers.IO) { val metadata = bluePrintRuntimeService.bluePrintContext().metadata!! val blueprintVersion = metadata[BluePrintConstants.METADATA_TEMPLATE_VERSION] val blueprintName = metadata[BluePrintConstants.METADATA_TEMPLATE_NAME] - return resourceResolutionRepository.findByResolutionKeyAndBlueprintNameAndBlueprintVersionAndArtifactName( - resolutionKey, - blueprintName, - blueprintVersion, - artifactPrefix).result!! + resourceResolutionRepository.findByResolutionKeyAndBlueprintNameAndBlueprintVersionAndArtifactName( + resolutionKey, + blueprintName, + blueprintVersion, + artifactPrefix).result!! } } \ No newline at end of file -- cgit 1.2.3-korg