summaryrefslogtreecommitdiffstats
path: root/so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java
diff options
context:
space:
mode:
Diffstat (limited to 'so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java')
-rw-r--r--so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java376
1 files changed, 376 insertions, 0 deletions
diff --git a/so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java b/so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java
new file mode 100644
index 0000000..c4bd210
--- /dev/null
+++ b/so-cnfm/so-cnfm-lcm/so-cnfm-lcm-bpmn-flows/src/main/java/org/onap/so/cnfm/lcm/bpmn/flows/service/JobExecutorService.java
@@ -0,0 +1,376 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.so.cnfm.lcm.bpmn.flows.service;
+
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.AS_INSTANCE_ID_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.CREATE_AS_REQUEST_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.INSTANTIATE_AS_REQUEST_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.JOB_ID_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.OCC_ID_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.CamundaVariableNameConstants.TERMINATE_AS_REQUEST_PARAM_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.CREATE_AS_WORKFLOW_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.DELETE_AS_WORKFLOW_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.INSTANTIATE_AS_WORKFLOW_NAME;
+import static org.onap.so.cnfm.lcm.bpmn.flows.Constants.TERMINATE_AS_WORKFLOW_NAME;
+import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.ERROR;
+import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED;
+import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.FINISHED_WITH_ERROR;
+import static org.onap.so.cnfm.lcm.database.beans.JobStatusEnum.IN_PROGRESS;
+import static org.slf4j.LoggerFactory.getLogger;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.onap.so.cnfm.lcm.bpmn.flows.GsonProvider;
+import org.onap.so.cnfm.lcm.bpmn.flows.exceptions.AsRequestProcessingException;
+import org.onap.so.cnfm.lcm.database.beans.AsInst;
+import org.onap.so.cnfm.lcm.database.beans.AsLcmOpOcc;
+import org.onap.so.cnfm.lcm.database.beans.AsLcmOpType;
+import org.onap.so.cnfm.lcm.database.beans.Job;
+import org.onap.so.cnfm.lcm.database.beans.JobAction;
+import org.onap.so.cnfm.lcm.database.beans.JobStatusEnum;
+import org.onap.so.cnfm.lcm.database.beans.OperationStateEnum;
+import org.onap.so.cnfm.lcm.database.beans.State;
+import org.onap.so.cnfm.lcm.database.service.DatabaseServiceProvider;
+import org.onap.so.cnfm.lcm.model.AsInstance;
+import org.onap.so.cnfm.lcm.model.CreateAsRequest;
+import org.onap.so.cnfm.lcm.model.ErrorDetails;
+import org.onap.so.cnfm.lcm.model.InstantiateAsRequest;
+import org.onap.so.cnfm.lcm.model.TerminateAsRequest;
+import org.slf4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+
+/**
+ * @author Waqas Ikram (waqas.ikram@est.tech)
+ *
+ */
+@Service
+public class JobExecutorService {
+
+ private static final Logger logger = getLogger(JobExecutorService.class);
+
+ private static final ImmutableSet<JobStatusEnum> JOB_FINISHED_STATES =
+ ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR);
+
+ private static final int SLEEP_TIME_IN_SECONDS = 5;
+
+ @Value("${so-cnfm-lcm.requesttimeout.timeoutInSeconds:300}")
+ private int timeOutInSeconds;
+
+ private final DatabaseServiceProvider databaseServiceProvider;
+ private final WorkflowExecutorService workflowExecutorService;
+ private final WorkflowQueryService workflowQueryService;
+ private final Gson gson;
+
+ @Autowired
+ public JobExecutorService(final DatabaseServiceProvider databaseServiceProvider,
+ final WorkflowExecutorService workflowExecutorService, final WorkflowQueryService workflowQueryService,
+ final GsonProvider gsonProvider) {
+ this.databaseServiceProvider = databaseServiceProvider;
+ this.workflowExecutorService = workflowExecutorService;
+ this.workflowQueryService = workflowQueryService;
+ this.gson = gsonProvider.getGson();
+ }
+
+ public AsInstance runCreateAsJob(final CreateAsRequest createAsRequest) {
+ logger.info("Starting 'Create AS' workflow job for request:\n{}", createAsRequest);
+ final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.CREATE)
+ .resourceId(createAsRequest.getAsdId()).resourceName(createAsRequest.getAsInstanceName())
+ .status(JobStatusEnum.STARTING);
+ databaseServiceProvider.addJob(newJob);
+
+ logger.info("New job created in database :\n{}", newJob);
+
+ workflowExecutorService.executeWorkflow(newJob.getJobId(), CREATE_AS_WORKFLOW_NAME,
+ getVariables(newJob.getJobId(), createAsRequest));
+
+ final ImmutablePair<String, JobStatusEnum> immutablePair =
+ waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
+
+ if (immutablePair.getRight() == null) {
+ final String message = "Failed to create AS for request: \n" + createAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+ final JobStatusEnum finalJobStatus = immutablePair.getRight();
+ final String processInstanceId = immutablePair.getLeft();
+
+ if (!FINISHED.equals(finalJobStatus)) {
+
+ final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
+ if (optional.isPresent()) {
+ final ErrorDetails errorDetails = optional.get();
+ final String message =
+ "Failed to create AS for request: \n" + createAsRequest + " due to \n" + errorDetails;
+ logger.error(message);
+ throw new AsRequestProcessingException(message, errorDetails);
+ }
+
+ final String message = "Received unexpected Job Status: " + finalJobStatus
+ + " Failed to Create AS for request: \n" + createAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ logger.debug("Will query for CreateAsResponse using processInstanceId:{}", processInstanceId);
+ final Optional<AsInstance> optional = workflowQueryService.getCreateNsResponse(processInstanceId);
+ if (optional.isEmpty()) {
+ final String message =
+ "Unable to find CreateAsReponse in Camunda History for process instance: " + processInstanceId;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+ return optional.get();
+ }
+
+ public String runInstantiateAsJob(final String asInstanceId, final InstantiateAsRequest instantiateAsRequest) {
+ final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.INSTANTIATE)
+ .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
+ databaseServiceProvider.addJob(newJob);
+ logger.info("New job created in database :\n{}", newJob);
+
+ final LocalDateTime currentDateTime = LocalDateTime.now();
+ final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.INSTANTIATE)
+ .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
+ .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
+ .isCancelPending(false).operationParams(gson.toJson(instantiateAsRequest));
+ databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
+ logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
+
+ workflowExecutorService.executeWorkflow(newJob.getJobId(), INSTANTIATE_AS_WORKFLOW_NAME,
+ getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), instantiateAsRequest));
+
+ final ImmutableSet<JobStatusEnum> jobFinishedStates =
+ ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
+ final ImmutablePair<String, JobStatusEnum> immutablePair =
+ waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
+
+ if (immutablePair.getRight() == null) {
+ final String message = "Failed to Instantiate AS for request: \n" + instantiateAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ final JobStatusEnum finalJobStatus = immutablePair.getRight();
+ if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
+ logger.info("Instantiation Job status: {}", finalJobStatus);
+ return newAsLcmOpOcc.getId();
+ }
+
+ final String message = "Received unexpected Job Status: " + finalJobStatus
+ + " Failed to instantiate AS for request: \n" + instantiateAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ public String runTerminateAsJob(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
+ doInitialTerminateChecks(asInstanceId, terminateAsRequest);
+
+ final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.TERMINATE)
+ .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
+ databaseServiceProvider.addJob(newJob);
+ logger.info("New job created in database :\n{}", newJob);
+
+ final LocalDateTime currentDateTime = LocalDateTime.now();
+ final AsLcmOpOcc newAsLcmOpOcc = new AsLcmOpOcc().id(asInstanceId).operation(AsLcmOpType.TERMINATE)
+ .operationState(OperationStateEnum.PROCESSING).stateEnteredTime(currentDateTime)
+ .startTime(currentDateTime).asInst(getAsInst(asInstanceId)).isAutoInvocation(false)
+ .isCancelPending(false).operationParams(gson.toJson(terminateAsRequest));
+ databaseServiceProvider.addAsLcmOpOcc(newAsLcmOpOcc);
+ logger.info("New AsLcmOpOcc created in database :\n{}", newAsLcmOpOcc);
+
+ workflowExecutorService.executeWorkflow(newJob.getJobId(), TERMINATE_AS_WORKFLOW_NAME,
+ getVariables(asInstanceId, newJob.getJobId(), newAsLcmOpOcc.getId(), terminateAsRequest));
+
+ final ImmutableSet<JobStatusEnum> jobFinishedStates =
+ ImmutableSet.of(FINISHED, ERROR, FINISHED_WITH_ERROR, IN_PROGRESS);
+ final ImmutablePair<String, JobStatusEnum> immutablePair =
+ waitForJobToFinish(newJob.getJobId(), jobFinishedStates);
+
+ if (immutablePair.getRight() == null) {
+ final String message =
+ "Failed to Terminate AS with id: " + asInstanceId + " for request: \n" + terminateAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ final JobStatusEnum finalJobStatus = immutablePair.getRight();
+
+ if (IN_PROGRESS.equals(finalJobStatus) || FINISHED.equals(finalJobStatus)) {
+ logger.info("Termination Job status: {}", finalJobStatus);
+ return newAsLcmOpOcc.getId();
+ }
+
+ final String message = "Received unexpected Job Status: " + finalJobStatus + " Failed to Terminate AS with id: "
+ + asInstanceId + " for request: \n" + terminateAsRequest;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ public void runDeleteAsJob(final String asInstanceId) {
+ final Job newJob = new Job().startTime(LocalDateTime.now()).jobType("AS").jobAction(JobAction.DELETE)
+ .resourceId(asInstanceId).status(JobStatusEnum.STARTING);
+ databaseServiceProvider.addJob(newJob);
+ logger.info("New job created in database :\n{}", newJob);
+
+ workflowExecutorService.executeWorkflow(newJob.getJobId(), DELETE_AS_WORKFLOW_NAME,
+ getVariables(asInstanceId, newJob.getJobId()));
+
+ final ImmutablePair<String, JobStatusEnum> immutablePair =
+ waitForJobToFinish(newJob.getJobId(), JOB_FINISHED_STATES);
+
+ if (immutablePair.getRight() == null) {
+ final String message = "Failed to Delete AS with id: " + asInstanceId;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ final JobStatusEnum finalJobStatus = immutablePair.getRight();
+ final String processInstanceId = immutablePair.getLeft();
+
+ logger.info("Delete Job status: {}", finalJobStatus);
+
+ if (!FINISHED.equals(finalJobStatus)) {
+
+ final Optional<ErrorDetails> optional = workflowQueryService.getErrorDetails(processInstanceId);
+ if (optional.isPresent()) {
+ final ErrorDetails errorDetails = optional.get();
+ final String message = "Failed to Delete AS with id: " + asInstanceId + " due to \n" + errorDetails;
+ logger.error(message);
+ throw new AsRequestProcessingException(message, errorDetails);
+ }
+
+ final String message = "Received unexpected Job Status: " + finalJobStatus
+ + " Failed to Delete AS with id: " + asInstanceId;
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+
+ logger.debug("Delete AS finished successfully ...");
+ }
+
+ private AsInst getAsInst(final String asInstId) {
+ logger.info("Getting AsInst with nsInstId: {}", asInstId);
+ final Optional<AsInst> optionalNfvoNsInst = databaseServiceProvider.getAsInst(asInstId);
+
+ if (optionalNfvoNsInst.isEmpty()) {
+ final String message = "No matching AS Instance for id: " + asInstId + " found in database.";
+ throw new AsRequestProcessingException(message);
+ }
+
+ return optionalNfvoNsInst.get();
+ }
+
+ private ImmutablePair<String, JobStatusEnum> waitForJobToFinish(final String jobId,
+ final ImmutableSet<JobStatusEnum> jobFinishedStates) {
+ try {
+ final long startTimeInMillis = System.currentTimeMillis();
+ final long timeOutTime = startTimeInMillis + TimeUnit.SECONDS.toMillis(timeOutInSeconds);
+
+ logger.info("Will wait till {} for {} job to finish", Instant.ofEpochMilli(timeOutTime).toString(), jobId);
+ JobStatusEnum currentJobStatus = null;
+ while (timeOutTime > System.currentTimeMillis()) {
+
+ final Optional<Job> optional = databaseServiceProvider.getRefreshedJob(jobId);
+
+ if (optional.isEmpty()) {
+ logger.error("Unable to find Job using jobId: {}", jobId);
+ return ImmutablePair.nullPair();
+ }
+
+ final Job job = optional.get();
+ currentJobStatus = job.getStatus();
+ logger.debug("Received job status response: \n {}", job);
+ if (jobFinishedStates.contains(currentJobStatus)) {
+ logger.info("Job finished \n {}", currentJobStatus);
+ return ImmutablePair.of(job.getProcessInstanceId(), currentJobStatus);
+ }
+
+ logger.info("Haven't received one of finish state {} yet, will try again in {} seconds",
+ jobFinishedStates, SLEEP_TIME_IN_SECONDS);
+ TimeUnit.SECONDS.sleep(SLEEP_TIME_IN_SECONDS);
+
+ }
+ logger.warn("Timeout current job status: {}", currentJobStatus);
+ return ImmutablePair.nullPair();
+ } catch (final InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ logger.error("Sleep was interrupted", interruptedException);
+ return ImmutablePair.nullPair();
+ }
+ }
+
+ private void doInitialTerminateChecks(final String asInstanceId, final TerminateAsRequest terminateAsRequest) {
+ final AsInst asInst = getAsInst(asInstanceId);
+ if (isNotInstantiated(asInst)) {
+ final String message = "TerminateAsRequest received: " + terminateAsRequest + " for asInstanceId: "
+ + asInstanceId + "\nUnable to terminate. AS Instance is already in " + State.NOT_INSTANTIATED
+ + " state." + "\nThis method can only be used with an AS instance in the " + State.INSTANTIATED
+ + " state.";
+ logger.error(message);
+ throw new AsRequestProcessingException(message);
+ }
+ }
+
+ private boolean isNotInstantiated(final AsInst asInst) {
+ return State.NOT_INSTANTIATED.equals(asInst.getStatus());
+ }
+
+ private Map<String, Object> getVariables(final String jobId, final CreateAsRequest createAsRequest) {
+ final Map<String, Object> variables = new HashMap<>();
+ variables.put(JOB_ID_PARAM_NAME, jobId);
+ variables.put(CREATE_AS_REQUEST_PARAM_NAME, createAsRequest);
+ return variables;
+ }
+
+ private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
+ final InstantiateAsRequest instantiateAsRequest) {
+ final Map<String, Object> variables = new HashMap<>();
+ variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
+ variables.put(JOB_ID_PARAM_NAME, jobId);
+ variables.put(OCC_ID_PARAM_NAME, occId);
+ variables.put(INSTANTIATE_AS_REQUEST_PARAM_NAME, instantiateAsRequest);
+ return variables;
+ }
+
+ private Map<String, Object> getVariables(final String asInstanceId, final String jobId, final String occId,
+ final TerminateAsRequest terminateAsRequest) {
+ final Map<String, Object> variables = new HashMap<>();
+ variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
+ variables.put(JOB_ID_PARAM_NAME, jobId);
+ variables.put(OCC_ID_PARAM_NAME, occId);
+ variables.put(TERMINATE_AS_REQUEST_PARAM_NAME, terminateAsRequest);
+ return variables;
+ }
+
+ private Map<String, Object> getVariables(final String asInstanceId, final String jobId) {
+ final Map<String, Object> variables = new HashMap<>();
+ variables.put(AS_INSTANCE_ID_PARAM_NAME, asInstanceId);
+ variables.put(JOB_ID_PARAM_NAME, jobId);
+ return variables;
+ }
+}