diff options
23 files changed, 629 insertions, 143 deletions
diff --git a/cds-ui/client/src/app/feature-modules/blueprint/blueprint.module.ts b/cds-ui/client/src/app/feature-modules/blueprint/blueprint.module.ts index 27803ce56..edbaca67f 100644 --- a/cds-ui/client/src/app/feature-modules/blueprint/blueprint.module.ts +++ b/cds-ui/client/src/app/feature-modules/blueprint/blueprint.module.ts @@ -32,10 +32,12 @@ import { DeployTemplateModule } from './deploy-template/deploy-template.module'; import { TestTemplateModule } from './test-template/test-template.module'; import { AppMaterialModule } from '../../../app/common/modules/app-material.module'; import { ReactiveFormsModule } from '@angular/forms'; +import { ZipfileExtractionComponent } from './common-module/zipfile-extraction/zipfile-extraction.component'; @NgModule({ declarations: [ - BlueprintComponent + BlueprintComponent, + ZipfileExtractionComponent ], imports: [ CommonModule, diff --git a/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.html b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.html new file mode 100644 index 000000000..9b7f44b1c --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.html @@ -0,0 +1,20 @@ +<!-- +============LICENSE_START========================================== +=================================================================== +Copyright (C) 2019 IBM Intellectual Property. All rights reserved. +=================================================================== + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the License); +you may not use this software except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END============================================ +-->
\ No newline at end of file diff --git a/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.scss b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.scss new file mode 100644 index 000000000..93f5c9dea --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.scss @@ -0,0 +1,20 @@ +/* +============LICENSE_START========================================== +=================================================================== +Copyright (C) 2019 IBM Intellectual Property. All rights reserved. +=================================================================== + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the License); +you may not use this software except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END============================================ +*/
\ No newline at end of file diff --git a/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.spec.ts b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.spec.ts new file mode 100644 index 000000000..a6674caae --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.spec.ts @@ -0,0 +1,46 @@ +/* +============LICENSE_START========================================== +=================================================================== +Copyright (C) 2019 IBM Intellectual Property. All rights reserved. +=================================================================== + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the License); +you may not use this software except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END============================================ +*/ + +import { async, ComponentFixture, TestBed } from '@angular/core/testing'; + +import { ZipfileExtractionComponent } from './zipfile-extraction.component'; + +describe('ZipfileExtractionComponent', () => { + let component: ZipfileExtractionComponent; + let fixture: ComponentFixture<ZipfileExtractionComponent>; + + beforeEach(async(() => { + TestBed.configureTestingModule({ + declarations: [ ZipfileExtractionComponent ] + }) + .compileComponents(); + })); + + beforeEach(() => { + fixture = TestBed.createComponent(ZipfileExtractionComponent); + component = fixture.componentInstance; + fixture.detectChanges(); + }); + + it('should create', () => { + expect(component).toBeTruthy(); + }); +}); diff --git a/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.ts b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.ts new file mode 100644 index 000000000..2683ff5f4 --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/common-module/zipfile-extraction/zipfile-extraction.component.ts @@ -0,0 +1,136 @@ +/* +============LICENSE_START========================================== +=================================================================== +Copyright (C) 2019 IBM Intellectual Property. All rights reserved. +=================================================================== + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the License); +you may not use this software except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END============================================ +*/ +import { Component, OnInit } from '@angular/core'; +import * as JSZip from 'jszip'; +import { SortPipe } from '../../../../common/shared/pipes/sort.pipe'; +import { LoaderService } from '../../../../common/core/services/loader.service'; + +@Component({ + selector: 'app-zipfile-extraction', + templateUrl: './zipfile-extraction.component.html', + styleUrls: ['./zipfile-extraction.component.scss'] +}) +export class ZipfileExtractionComponent implements OnInit { + private paths = []; + private tree; + private zipFile: JSZip = new JSZip(); + private fileObject: any; + private activationBlueprint: any; + private tocsaMetadaData: any; + private blueprintName: string; + private entryDefinition: string; + validfile: boolean = false; + uploadedFileName: string; + + constructor(private loader: LoaderService) { } + + ngOnInit() { + } + async buildFileViewData(zip) { + this.validfile = false; + this.paths = []; + console.log(zip.files); + for (var file in zip.files) { + console.log("name: " + zip.files[file].name); + this.fileObject = { + // nameForUIDisplay: this.uploadedFileName + '/' + zip.files[file].name, + // name: zip.files[file].name, + name: this.uploadedFileName + '/' + zip.files[file].name, + data: '' + }; + const value = <any>await zip.files[file].async('string'); + this.fileObject.data = value; + this.paths.push(this.fileObject); + } + + if (this.paths) { + this.paths.forEach(path => { + if (path.name.includes("TOSCA.meta")) { + this.validfile = true + } + }); + } else { + alert('Please update proper file'); + } + + if (this.validfile) { + this.fetchTOSACAMetadata(); + this.paths = new SortPipe().transform(this.paths, 'asc', 'name'); + this.tree = this.arrangeTreeData(this.paths); + } else { + alert('Please update proper file with TOSCA metadata'); + } + } + + arrangeTreeData(paths) { + const tree = []; + + paths.forEach((path) => { + + const pathParts = path.name.split('/'); + // pathParts.shift(); + let currentLevel = tree; + + pathParts.forEach((part) => { + const existingPath = currentLevel.filter(level => level.name === part); + + if (existingPath.length > 0) { + currentLevel = existingPath[0].children; + } else { + const newPart = { + name: part, + children: [], + data: path.data, + path: path.name + }; + if (part.trim() == this.blueprintName.trim()) { + this.activationBlueprint = path.data; + newPart.data = JSON.parse(this.activationBlueprint.toString()); + console.log('newpart', newPart); + this.entryDefinition = path.name.trim(); + } + if (newPart.name !== '') { + currentLevel.push(newPart); + currentLevel = newPart.children; + } + } + }); + }); + this.loader.hideLoader(); + return tree; + } + + fetchTOSACAMetadata() { + let toscaData = {}; + this.paths.forEach(file => { + if (file.name.includes('TOSCA.meta')) { + let keys = file.data.split("\n"); + keys.forEach((key) => { + let propertyData = key.split(':'); + toscaData[propertyData[0]] = propertyData[1]; + }); + } + }); + this.blueprintName = (((toscaData['Entry-Definitions']).split('/'))[1]).toString();; + console.log(toscaData); + } + +} diff --git a/cds-ui/client/src/app/feature-modules/blueprint/select-template/metadata/metadata.component.ts b/cds-ui/client/src/app/feature-modules/blueprint/select-template/metadata/metadata.component.ts index cefe0fd93..174bdf183 100644 --- a/cds-ui/client/src/app/feature-modules/blueprint/select-template/metadata/metadata.component.ts +++ b/cds-ui/client/src/app/feature-modules/blueprint/select-template/metadata/metadata.component.ts @@ -30,7 +30,7 @@ import { IBlueprint } from 'src/app/common/core/store/models/blueprint.model'; import { IMetaData } from '../../../../common/core/store/models/metadata.model'; import { SetBlueprintState } from 'src/app/common/core/store/actions/blueprint.action'; import { LoaderService } from '../../../../common/core/services/loader.service'; - +import { SelectTemplateService } from 'src/app/feature-modules/blueprint/select-template/select-template.service'; @Component({ selector: 'app-metadata', templateUrl: './metadata.component.html', @@ -48,8 +48,9 @@ export class MetadataComponent implements OnInit { blueprintName: string; uploadedFileName: string; entryDefinition: string; - - constructor(private formBuilder: FormBuilder, private store: Store<IAppState>, private loader: LoaderService) { + + constructor(private formBuilder: FormBuilder, private store: Store<IAppState>, + private loader: LoaderService, private dataService: SelectTemplateService) { this.bpState = this.store.select('blueprint'); this.CBAMetadataForm = this.formBuilder.group({ template_author: ['', Validators.required], @@ -62,6 +63,9 @@ export class MetadataComponent implements OnInit { } ngOnInit() { + this.dataService.getCbaOption().subscribe( + res => {console.log("data from service " + res);} + ); this.bpState.subscribe( blueprintdata => { var blueprintState: IBlueprintState = { blueprint: blueprintdata.blueprint, isLoadSuccess: blueprintdata.isLoadSuccess, isSaveSuccess: blueprintdata.isSaveSuccess, isUpdateSuccess: blueprintdata.isUpdateSuccess }; @@ -95,18 +99,22 @@ export class MetadataComponent implements OnInit { }); }) } - +ngAfterInit(){ + this.dataService.getCbaOption().subscribe( + res => {console.log("data from service after init" + res);} + ); +} UploadMetadata() { this.loader.showLoader(); this.metadata = Object.assign({}, this.CBAMetadataForm.value); this.blueprint.metadata = this.metadata; - if( this.blueprint && - this.blueprint['topology_template'] && - this.blueprint['topology_template'].workflows && - this.blueprint['topology_template'].workflows['resource-assignment'] && - this.blueprint['topology_template'].workflows['resource-assignment'].name) { - delete this.blueprint['topology_template'].workflows['resource-assignment'].name; - } + /*if (this.blueprint && + this.blueprint['topology_template'] && + this.blueprint['topology_template'].workflows && + this.blueprint['topology_template'].workflows['resource-assignment'] && + this.blueprint['topology_template'].workflows['resource-assignment'].name) { + delete this.blueprint['topology_template'].workflows['resource-assignment'].name; + }*/ this.filesData.forEach((fileNode) => { if (fileNode.name.includes(this.blueprintName) && fileNode.name == this.entryDefinition) { fileNode.data = JSON.stringify(this.blueprint, null, "\t"); diff --git a/cds-ui/client/src/app/feature-modules/blueprint/select-template/search-template/search-template.module.ts b/cds-ui/client/src/app/feature-modules/blueprint/select-template/search-template/search-template.module.ts index 11029663b..9bafaebdc 100644 --- a/cds-ui/client/src/app/feature-modules/blueprint/select-template/search-template/search-template.module.ts +++ b/cds-ui/client/src/app/feature-modules/blueprint/select-template/search-template/search-template.module.ts @@ -26,6 +26,8 @@ import { SearchTemplateComponent } from './search-template.component'; import { ReactiveFormsModule } from '@angular/forms'; import { AppMaterialModule } from 'src/app/common/modules/app-material.module'; import { SharedModule} from 'src/app/common/shared/shared.module'; +import { SelectTemplateService } from 'src/app/feature-modules/blueprint/select-template/select-template.service'; + @NgModule({ declarations: [ SearchTemplateComponent, @@ -39,6 +41,7 @@ import { SharedModule} from 'src/app/common/shared/shared.module'; exports:[ SearchTemplateComponent, SearchFromDatabaseComponent - ] + ], + providers:[ SelectTemplateService] }) export class SearchTemplateModule { } diff --git a/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.spec.ts b/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.spec.ts new file mode 100644 index 000000000..e5e2d0256 --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.spec.ts @@ -0,0 +1,12 @@ +import { TestBed } from '@angular/core/testing'; + +import { SelectTemplateService } from './select-template.service'; + +describe('SelectTemplateService', () => { + beforeEach(() => TestBed.configureTestingModule({})); + + it('should be created', () => { + const service: SelectTemplateService = TestBed.get(SelectTemplateService); + expect(service).toBeTruthy(); + }); +}); diff --git a/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.ts b/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.ts new file mode 100644 index 000000000..fa18cbd25 --- /dev/null +++ b/cds-ui/client/src/app/feature-modules/blueprint/select-template/select-template.service.ts @@ -0,0 +1,40 @@ +/* +============LICENSE_START========================================== +=================================================================== +Copyright (C) 2019 IBM Intellectual Property. All rights reserved. +=================================================================== + +Unless otherwise specified, all software contained herein is licensed +under the Apache License, Version 2.0 (the License); +you may not use this software except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============LICENSE_END============================================ +*/ + +import { Injectable } from '@angular/core'; +import { Observable, of } from 'rxjs'; + +@Injectable({ + providedIn: 'root' +}) +export class SelectTemplateService { + cbaOption: string; + + constructor() { } + + setCbaOption(option: string) { + this.cbaOption = option; + } + + getCbaOption(): Observable<string> { + return of(this.cbaOption); + } +} diff --git a/components/model-catalog/blueprint-model/test-blueprint/remote_ansible/Definitions/node_types.json b/components/model-catalog/blueprint-model/test-blueprint/remote_ansible/Definitions/node_types.json index 5f0deeb98..cb9614eed 100644 --- a/components/model-catalog/blueprint-model/test-blueprint/remote_ansible/Definitions/node_types.json +++ b/components/model-catalog/blueprint-model/test-blueprint/remote_ansible/Definitions/node_types.json @@ -24,8 +24,13 @@ "process" : { "inputs" : { "job-template-name" : { - "description" : "Job template to execute in AWX", - "required" : true, + "description" : "Primary key or name of the job template to launch new job.", + "required" : false, + "type" : "string" + }, + "workflow-job-template-id" : { + "description" : "Primary key (name not supported) of the workflow job template to launch new job.", + "required" : false, "type" : "string" }, "limit" : { diff --git a/components/model-catalog/definition-type/starter-type/node_type/component-remote-ansible-executor.json b/components/model-catalog/definition-type/starter-type/node_type/component-remote-ansible-executor.json index 508380c91..f5d9d3f7a 100644 --- a/components/model-catalog/definition-type/starter-type/node_type/component-remote-ansible-executor.json +++ b/components/model-catalog/definition-type/starter-type/node_type/component-remote-ansible-executor.json @@ -30,6 +30,11 @@ "required": true, "type": "string" }, + "workflow-job-template-id": { + "description": "Primary key (name not supported) of the workflow job template to launch new job.", + "required": false, + "type": "string" + }, "limit": { "description": "Specify host limit for job template to run.", "required": false, diff --git a/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt b/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt index 947a9630d..743aa714b 100644 --- a/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt +++ b/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt @@ -24,10 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.isNotNull -import org.onap.ccsdk.cds.controllerblueprints.core.rootFieldsToMap +import org.onap.ccsdk.cds.controllerblueprints.core.* import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.slf4j.LoggerFactory import org.springframework.beans.factory.config.ConfigurableBeanFactory @@ -68,6 +65,7 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe // input fields names accepted by this executor const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector" const val INPUT_JOB_TEMPLATE_NAME = "job-template-name" + const val INPUT_WORKFLOW_JOB_TEMPLATE_NAME = "workflow-job-template-id" const val INPUT_LIMIT_TO_HOST = "limit" const val INPUT_INVENTORY = "inventory" const val INPUT_EXTRA_VARS = "extra-vars" @@ -85,12 +83,20 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe try { val restClientService = getAWXRestClient() - val jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).asText() - val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName) + // Get either a job template name or a workflow template name property + var workflowURIPrefix = "" + var jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).returnNullIfMissing()?.textValue() ?: "" + val isWorkflowJT = jobTemplateName.isBlank() + if (isWorkflowJT) { + jobTemplateName = getOperationInput(INPUT_WORKFLOW_JOB_TEMPLATE_NAME).asText() + workflowURIPrefix = "workflow_" + } + + val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName, workflowURIPrefix) if (jtId.isNotEmpty()) { - runJobTemplateOnAWX(restClientService, jobTemplateName, jtId) + runJobTemplateOnAWX(restClientService, jobTemplateName, jtId, workflowURIPrefix) } else { - val message = "Job template ${jobTemplateName} does not exists" + val message = "Workflow/Job template ${jobTemplateName} does not exists" log.error(message) setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message) } @@ -135,9 +141,10 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe /** * Finds the job template ID based on the job template name provided in the request */ - private fun lookupJobTemplateIDByName(awxClient: BlueprintWebClientService, job_template_name: String?): String { + private fun lookupJobTemplateIDByName(awxClient: BlueprintWebClientService, job_template_name: String?, + workflowPrefix : String) : String { val encodedJTName = URI(null, null, - "/api/v2/job_templates/${job_template_name}/", + "/api/v2/${workflowPrefix}job_templates/${job_template_name}/", null, null).rawPath // Get Job Template details by name @@ -152,19 +159,20 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe * its execution. Finally, it retrieves the job results via the stdout api. * The status and output attributes are populated in the process. */ - private fun runJobTemplateOnAWX(awxClient: BlueprintWebClientService, job_template_name: String?, jtId: String) { + private fun runJobTemplateOnAWX(awxClient: BlueprintWebClientService, job_template_name: String?, jtId: String, + workflowPrefix : String) { setNodeOutputProperties("preparing".asJsonPrimitive(), "".asJsonPrimitive()) // Get Job Template requirements - var response = awxClient.exchangeResource(GET, "/api/v2/job_templates/${jtId}/launch/", "") + var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", "") // FIXME: handle non-successful SC val jtLaunchReqs: JsonNode = mapper.readTree(response.body) - val payload = prepareLaunchPayload(awxClient, jtLaunchReqs) + val payload = prepareLaunchPayload(awxClient, jtLaunchReqs, workflowPrefix.isBlank()) log.info("Running job with $payload, for requestId $processId.") // Launch the job for the targeted template var jtLaunched: JsonNode = JacksonUtils.objectMapper.createObjectNode() - response = awxClient.exchangeResource(POST, "/api/v2/job_templates/${jtId}/launch/", payload) + response = awxClient.exchangeResource(POST, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", payload) if (response.status in HTTP_SUCCESS) { jtLaunched = mapper.readTree(response.body) val fieldsIgnored: JsonNode = jtLaunched.at("/ignored_fields") @@ -180,7 +188,7 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe var jobStatus = "unknown" var jobEndTime = "null" while (jobEndTime == "null") { - response = awxClient.exchangeResource(GET, "/api/v2/jobs/${jobId}/", "") + response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/", "") val jobLaunched: JsonNode = mapper.readTree(response.body) jobStatus = jobLaunched.at("/status").asText() jobEndTime = jobLaunched.at("/finished").asText() @@ -189,12 +197,10 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe log.info("Execution of job template $job_template_name in job #$jobId finished with status ($jobStatus) for requestId $processId") - // Get job execution results (stdout) - val plainTextHeaders = mutableMapOf<String, String>() - plainTextHeaders["Content-Type"] = "text/plain ;utf-8" - response = awxClient.exchangeResource(GET, "/api/v2/jobs/${jobId}/stdout/?format=txt", "", plainTextHeaders) + // Get workflow/job execution results + val collectedOutput = extractJobRunResponse(awxClient, jobId, workflowPrefix) - setNodeOutputProperties(jobStatus.asJsonPrimitive(), response.body.asJsonPrimitive()) + setNodeOutputProperties(jobStatus.asJsonPrimitive(), collectedOutput.asJsonPrimitive()) } else { // The job template requirements were not fulfilled with the values passed in. The message below will // provide more information via the response, like the ignored_fields, or variables_needed_to_start, @@ -207,42 +213,77 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe } /** + * Extracts the response from either a job stdout call OR collects the workflow run output + */ + private fun extractJobRunResponse(awxClient: BlueprintWebClientService, jobId: String, workflowPrefix: String): String { + + // First, collect all job ID from either the job template run or the workflow nodes that ran + var jobIds : Array<String> + var collectedResponses = StringBuilder() + if (workflowPrefix.isNotEmpty()) { + var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/workflow_nodes/", "") + val jobDetails = mapper.readTree(response.body).at("/results") + jobIds = emptyArray() + for (jobDetail in jobDetails.elements()) { + jobIds = jobIds.plus( jobDetail.at("/summary_fields/job/id").asText() ) + } + } else { + jobIds = arrayOf(jobId) + } + + // Then collect the response text from the corresponding jobIds + val plainTextHeaders = mutableMapOf<String, String>() + plainTextHeaders["Content-Type"] = "text/plain ;utf-8" + for (aJobId in jobIds) { + var response = awxClient.exchangeResource(GET, "/api/v2/jobs/${aJobId}/stdout/?format=txt", "", plainTextHeaders) + collectedResponses.append("Output for job ${aJobId}:") + collectedResponses.append(response.body) + } + return collectedResponses.toString() + } + + /** * Prepares the JSON payload expected by the job template api, * by applying the overrides that were provided * and allowed by the template definition flags in jtLaunchReqs */ - private fun prepareLaunchPayload(awxClient: BlueprintWebClientService, jtLaunchReqs: JsonNode): String { + private fun prepareLaunchPayload(awxClient: BlueprintWebClientService, jtLaunchReqs: JsonNode, + isWorkflow : Boolean): String { val payload = JacksonUtils.objectMapper.createObjectNode() // Parameter defaults - val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST) - val tagsProp = getOptionalOperationInput(INPUT_TAGS) - val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS) val inventoryProp = getOptionalOperationInput(INPUT_INVENTORY) val extraArgs = getOperationInput(INPUT_EXTRA_VARS) - val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean() - if (askLimitOnLaunch && limitProp.isNotNull()) { - payload.set(INPUT_LIMIT_TO_HOST, limitProp) - } - val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean() - if (askTagsOnLaunch && tagsProp.isNotNull()) { - payload.set(INPUT_TAGS, tagsProp) - } - if (askTagsOnLaunch && skipTagsProp.isNotNull()) { - payload.set("skip_tags", skipTagsProp) + if (!isWorkflow) { + val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST) + val tagsProp = getOptionalOperationInput(INPUT_TAGS) + val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS) + + val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean() + if (askLimitOnLaunch && limitProp.isNotNull()) { + payload.set(INPUT_LIMIT_TO_HOST, limitProp) + } + val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean() + if (askTagsOnLaunch && tagsProp.isNotNull()) { + payload.set(INPUT_TAGS, tagsProp) + } + if (askTagsOnLaunch && skipTagsProp.isNotNull()) { + payload.set("skip_tags", skipTagsProp) + } } + val askInventoryOnLaunch = jtLaunchReqs.at("/ask_inventory_on_launch").asBoolean() if (askInventoryOnLaunch && inventoryProp.isNotNull()) { var inventoryKeyId = if (inventoryProp is TextNode) { - resolveInventoryIdByName(awxClient, inventoryProp!!.textValue())?.asJsonPrimitive() + resolveInventoryIdByName(awxClient, inventoryProp.textValue())?.asJsonPrimitive() } else { inventoryProp } payload.set(INPUT_INVENTORY, inventoryKeyId) } val askVariablesOnLaunch = jtLaunchReqs.at("/ask_variables_on_launch").asBoolean() - if (askVariablesOnLaunch && extraArgs != null) { + if (askVariablesOnLaunch) { payload.set("extra_vars", extraArgs) } return payload.asJsonString(false) diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt new file mode 100644 index 000000000..47b55b018 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import kotlin.reflect.KClass + + +fun <T : Any> ExecutionServiceInput.payloadAsType(clazzType: KClass<T>): T { + val actionName = this.actionIdentifiers.actionName + val requestJsonNode = this.payload.get("$actionName-request") + return requestJsonNode.asType(clazzType.java) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 408bb58ed..8759338b7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.withTimeout import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status @@ -47,6 +48,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic lateinit var interfaceName: String lateinit var operationName: String lateinit var nodeTemplateName: String + var timeout: Int = 180 var operationInputs: MutableMap<String, JsonNode> = hashMapOf() override fun getName(): String { @@ -87,6 +89,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic this.operationInputs.putAll(operationResolvedProperties) + val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT) + timeout?.let { this.timeout = timeout } + return executionRequest } @@ -118,7 +123,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { try { prepareRequestNB(executionServiceInput) - processNB(executionServiceInput) + withTimeout((timeout * 1000).toLong()) { + processNB(executionServiceInput) + } } catch (runtimeException: RuntimeException) { log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException) recoverNB(runtimeException, executionServiceInput) diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index 2a14be216..6bee17f4b 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -44,10 +44,8 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, - executionServiceInput, deferredOutput) - return deferredOutput.await() + return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput) } } @@ -60,35 +58,41 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput lateinit var workflowName: String - lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput> override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: ExecutionServiceInput, - output: CompletableDeferred<ExecutionServiceOutput>) { + input: ExecutionServiceInput): ExecutionServiceOutput { this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input this.workflowName = this.executionServiceInput.actionIdentifiers.actionName - this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<ExecutionServiceOutput>() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor.send(startMessage) + } else { + throw BluePrintProcessorException("workflow($workflowActor) actor is closed") + } + return output.await() } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { - val wfStatus = if (exception != null) { - val status = Status() - status.message = BluePrintConstants.STATUS_FAILURE - status.errorMessage = exception.message - status - } else { - val status = Status() - status.message = BluePrintConstants.STATUS_SUCCESS - status + override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + val wfStatus = Status().apply { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + val errorMessage = it.message ?: "" + bluePrintRuntimeService.getBluePrintError().addError(errorMessage) + log.error("workflow($workflowId) exception :", it) + } + message = BluePrintConstants.STATUS_FAILURE + } else { + message = BluePrintConstants.STATUS_SUCCESS + } } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt index 89732e300..b64177aab 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt @@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.putJsonElement +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService import org.slf4j.LoggerFactory @@ -37,15 +37,22 @@ open class NodeTemplateExecutionService { executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { // Get the Blueprint Context val blueprintContext = bluePrintRuntimeService.bluePrintContext() + + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) // Get the Component Name, NodeTemplate type is mapped to Component Name - val componentName = blueprintContext.nodeTemplateByName(nodeTemplateName).type + val componentName = nodeTemplate.type val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + val nodeTemplateImplementation = blueprintContext + .nodeTemplateOperationImplementation(nodeTemplateName, interfaceName, operationName) + + val timeout: Int = nodeTemplateImplementation?.timeout ?: 180 + log.info("executing node template($nodeTemplateName) component($componentName) " + - "interface($interfaceName) operation($operationName)") + "interface($interfaceName) operation($operationName) with timeout($timeout) sec.") // Get the Component Instance val plugin = BluePrintDependencyService.instance<AbstractComponentFunction>(componentName) @@ -62,9 +69,10 @@ open class NodeTemplateExecutionService { // Populate Step Meta Data val stepInputs: MutableMap<String, JsonNode> = hashMapOf() - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE, nodeTemplateName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_INTERFACE, interfaceName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_OPERATION, operationName) + stepInputs[BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE] = nodeTemplateName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_INTERFACE] = interfaceName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_OPERATION] = operationName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_TIMEOUT] = timeout.asJsonPrimitive() val stepInputData = StepData().apply { name = nodeTemplateName properties = stepInputs diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 064c196ed..ba5815bb6 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -180,6 +180,7 @@ object BluePrintConstants { const val PROPERTY_CURRENT_NODE_TEMPLATE = "current-node-template" const val PROPERTY_CURRENT_INTERFACE = "current-interface" const val PROPERTY_CURRENT_OPERATION = "current-operation" + const val PROPERTY_CURRENT_TIMEOUT = "current-timeout" const val PROPERTY_CURRENT_IMPLEMENTATION = "current-implementation" const val PROPERTY_EXECUTION_REQUEST = "execution-request" diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt index 93ba15e99..08bc6c3fd 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt @@ -242,6 +242,22 @@ fun Map<String, JsonNode>.getAsDouble(key: String): Double { return this[key]?.asDouble() ?: throw BluePrintException("couldn't find value for key($key)") } +fun Map<String, JsonNode>.getOptionalAsString(key: String): String? { + return if (this.containsKey(key)) this[key]!!.asText() else null +} + +fun Map<String, JsonNode>.getOptionalAsBoolean(key: String): Boolean? { + return if (this.containsKey(key)) this[key]!!.asBoolean() else null +} + +fun Map<String, JsonNode>.getOptionalAsInt(key: String): Int? { + return if (this.containsKey(key)) this[key]!!.asInt() else null +} + +fun Map<String, JsonNode>.getOptionalAsDouble(key: String): Double? { + return if (this.containsKey(key)) this[key]!!.asDouble() else null +} + // Checks inline fun checkEquals(value1: String?, value2: String?, lazyMessage: () -> Any): Boolean { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt index 9e1b7498e..fc796c9ed 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt @@ -33,7 +33,8 @@ enum class NodeStatus(val id: String) { READY("ready"), EXECUTING("executing"), EXECUTED("executed"), - SKIPPED("skipped") + SKIPPED("skipped"), + TERMINATED("terminated") } class Graph { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt index 066516fcc..b368c01aa 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt @@ -216,6 +216,11 @@ class BluePrintContext(val serviceTemplate: ServiceTemplate) { ?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s first InterfaceAssignment's first OperationAssignment name") } + fun nodeTemplateOperationImplementation(nodeTemplateName: String, interfaceName: String, operationName: String) + : Implementation? { + return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).implementation + } + fun nodeTemplateInterfaceOperationInputs(nodeTemplateName: String, interfaceName: String, operationName: String): MutableMap<String, JsonNode>? { return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).inputs } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 905150213..5cec3c947 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -30,13 +30,12 @@ import kotlin.coroutines.CoroutineContext interface BluePrintWorkFlowService<In, Out> { /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService] - * and workflow input [input], response will be retrieve from output [output]*/ - suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) + * and workflow input [input]*/ + suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out + suspend fun prepareWorkflowOutput(): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> @@ -91,6 +90,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP lateinit var workflowId: String + var exceptions: MutableList<Exception> = arrayListOf() + final override val coroutineContext: CoroutineContext get() = job + CoroutineName("Wf") @@ -100,7 +101,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP throw CancellationException("Workflow($workflowId) cancelled as requested") } - fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { @@ -119,13 +120,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.info("End Node Completed, processing completion message") - val bluePrintProcessorException: BluePrintProcessorException? = - if (exception != null) BluePrintProcessorException(exception) else null - - val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) + if (exception != null) exceptions.add(BluePrintProcessorException(exception)) + log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions") + val workflowOutput = prepareWorkflowOutput() workflowExecuteMessage.output.complete(workflowOutput) - channel.close(exception) + channel.close() } } } @@ -135,7 +134,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP when (message) { is WorkflowExecuteMessage<In, Out> -> { launch { - executeMessageActor(message) + try { + executeMessageActor(message) + } catch (e: Exception) { + exceptions.add(e) + } } } is WorkflowRestartMessage<In, Out> -> { @@ -153,7 +156,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } - private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { @@ -164,7 +167,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) { // Process only Next Success Node val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState)) - log.debug("Next Edges :$stateEdges") if (stateEdges.isNotEmpty()) { stateEdges.forEach { stateEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -213,7 +215,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } triggerToExecuteOrSkip(newMessage) } else { - log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)") + log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)") } } else { triggerToExecuteOrSkip(message) @@ -233,15 +235,19 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } // Update Node Completed node.status = NodeStatus.EXECUTED - log.info("Execute Node($node) -> Executed State($nodeState)") + log.info("Execute node(${node.id}) -> executed state($nodeState)") + // Check if the Node status edge is there, If not close processing + val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty() // If End Node, Send End Message if (graph.isEndNode(node)) { // Close the current channel channel.close() + } else if (!edgePresent) { + throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.") } else { val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState)) - log.debug("Skipping node($node) outgoing Edges($skippingEdges)") + log.debug("Skipping node($node)'s outgoing edges($skippingEdges)") // Process Skip Edges skippingEdges.forEach { skippingEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -266,7 +272,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) - log.info("Skip Node($node) -> Executed State($nodeState)") + log.info("Skip node(${node.id}) -> executed state($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED // Look for next possible skip nodes @@ -283,7 +289,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch { channel.close() - throw CancellationException("Workflow($workflowId) actor cancelled as requested ...") + throw CancellationException("Workflow($workflowId) actor cancelled as requested.") } /** Process each actor message received based on type **/ @@ -294,7 +300,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { readyNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } is NodeExecuteMessage<In, Out> -> { @@ -302,7 +309,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { executeNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -311,7 +320,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { skipNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -320,20 +331,12 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { restartNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } } } } } - - override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) { - log.info("Executing Graph : $graph") - this.graph = graph - this.workflowId = bluePrintRuntimeService.id() - val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) - } }
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt index 7c2466fb7..73dff9379 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt @@ -236,27 +236,27 @@ class JacksonUtils { } } - fun populatePrimitiveValues(key: String, value: Any, primitiveType: String, objectNode: ObjectNode) { + fun populatePrimitiveValues(key: String, value: JsonNode, primitiveType: String, objectNode: ObjectNode) { when (primitiveType.toLowerCase()) { BluePrintConstants.DATA_TYPE_BOOLEAN, BluePrintConstants.DATA_TYPE_INTEGER, BluePrintConstants.DATA_TYPE_FLOAT, BluePrintConstants.DATA_TYPE_DOUBLE, BluePrintConstants.DATA_TYPE_TIMESTAMP, - BluePrintConstants.DATA_TYPE_STRING-> - objectNode.set(key, value.asJsonPrimitive()) + BluePrintConstants.DATA_TYPE_STRING -> + objectNode.set(key, value) else -> throw BluePrintException("populatePrimitiveValues expected only primitive values! Received: ($value)") } } - fun populatePrimitiveValues(value: Any, primitiveType: String, arrayNode: ArrayNode) { + fun populatePrimitiveValues(value: JsonNode, primitiveType: String, arrayNode: ArrayNode) { when (primitiveType.toLowerCase()) { BluePrintConstants.DATA_TYPE_BOOLEAN, BluePrintConstants.DATA_TYPE_INTEGER, BluePrintConstants.DATA_TYPE_FLOAT, BluePrintConstants.DATA_TYPE_DOUBLE, BluePrintConstants.DATA_TYPE_TIMESTAMP, - BluePrintConstants.DATA_TYPE_STRING -> arrayNode.add(value.asJsonPrimitive()) + BluePrintConstants.DATA_TYPE_STRING -> arrayNode.add(value) else -> throw BluePrintException("populatePrimitiveValues expected only primitive values! Received: ($value)") } } @@ -300,7 +300,6 @@ class JacksonUtils { BluePrintConstants.DATA_TYPE_INTEGER -> jsonNodeFromObject(value.toInt()) BluePrintConstants.DATA_TYPE_FLOAT -> jsonNodeFromObject(value.toFloat()) BluePrintConstants.DATA_TYPE_DOUBLE -> jsonNodeFromObject(value.toDouble()) - //TODO: Verify.. I assume string type should be here.. BluePrintConstants.DATA_TYPE_STRING -> jsonNodeFromObject(value) else -> getJsonNode(value) } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index b8d8cea3e..4d97f8bc3 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -18,13 +18,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.toGraph import kotlin.test.assertNotNull @@ -36,10 +36,66 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testMultipleFlows() { + runBlocking { + coroutineScope { + val wfs = listOf("12345", "12346").map { + async { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(it), it) + assertNotNull(response, "failed to get response") + } + } + wfs.awaitAll() + } + } + } + + @Test + fun testMissingEdgeForBFailureState() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B")) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testTimeoutExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -51,10 +107,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -68,10 +122,8 @@ class BluePrintWorkflowServiceTest { val failurePathWorkflow = TestBluePrintWorkFlowService() failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), arrayListOf("A")) - val failurePathWorkflowDeferredOutput = CompletableDeferred<String>() val failurePathWorkflowInput = "123456" - failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) - val failurePathResponse = failurePathWorkflowDeferredOutput.await() + val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput) assertNotNull(failurePathResponse, "failed to get response") } } @@ -83,10 +135,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -98,17 +148,19 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + return mockBluePrintRuntimeService("123456") + } + + private fun mockBluePrintRuntimeService(id: String): BluePrintRuntimeService<*> { val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>() - every { bluePrintRuntimeService.id() } returns "123456" + every { bluePrintRuntimeService.id() } returns id return bluePrintRuntimeService } @@ -126,6 +178,7 @@ class BluePrintWorkflowServiceTest { class TestBluePrintWorkFlowService : AbstractBluePrintWorkFlowService<String, String>() { + val log = logger(TestBluePrintWorkFlowService::class) lateinit var simulatedState: MutableMap<String, EdgeLabel> @@ -133,6 +186,21 @@ class TestBluePrintWorkFlowService return EdgeLabel.SUCCESS } + override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: String): String { + log.info("Executing Graph : $graph") + this.graph = graph + this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<String>() + val startMessage = WorkflowExecuteMessage(input, output) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor().send(startMessage) + } else { + throw BluePrintProcessorException("workflow actor is closed for send $workflowActor") + } + return startMessage.output.await() + } + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<String, String> { return NodeExecuteMessage(node, "$node Input", "") @@ -140,23 +208,26 @@ class TestBluePrintWorkFlowService override suspend fun executeNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { -// val random = (1..10).random() * 1000 -// println("will reply in $random ms") +// val random = (1..10).random() * 100 +// log.info("workflow($workflowId) node(${node.id}) will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status +// //Simulation for timeout + if (node.id == "TO") { + withTimeout(1) { + kotlinx.coroutines.delay(2) + } + } + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { val nodeOutput = "" - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) - return nodeSkipMessage + return NodeSkipMessage(node, "$node Skip Input", nodeOutput) } override suspend fun skipNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, @@ -169,7 +240,12 @@ class TestBluePrintWorkFlowService TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { + override suspend fun prepareWorkflowOutput(): String { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + log.error("workflow($workflowId) exceptions :", it) + } + } return "Final Response" } }
\ No newline at end of file |