diff options
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java')
-rw-r--r-- | vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java new file mode 100644 index 000000000..aa94a2aa0 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java @@ -0,0 +1,135 @@ +package org.onap.vid.job.impl; + +import org.apache.commons.lang3.StringUtils; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.NextCommand; +import org.onap.vid.job.command.JobCommandFactory; +import org.onap.vid.properties.Features; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.quartz.JobExecutionContext; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.stereotype.Component; +import org.togglz.core.manager.FeatureManager; + +import java.util.Optional; +import java.util.UUID; + +import static org.onap.vid.job.Job.JobStatus.FAILED; +import static org.onap.vid.job.Job.JobStatus.STOPPED; + +@Component +public class JobWorker extends QuartzJobBean { + + private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class); + + private JobsBrokerService jobsBrokerService; + private FeatureManager featureManager; + private JobCommandFactory jobCommandFactory; + private Job.JobStatus topic; + + @Override + protected void executeInternal(JobExecutionContext context) { + Optional<Job> job; + + if (!isMsoNewApiActive()) { + return; + } + + job = pullJob(); + + while (job.isPresent()) { + Job nextJob = executeJobAndGetNext(job.get()); + pushBack(nextJob); + + job = pullJob(); + } + } + + private Optional<Job> pullJob() { + try { + return jobsBrokerService.pull(topic, UUID.randomUUID().toString()); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e); + return Optional.empty(); + } + } + + private void pushBack(Job nextJob) { + try { + jobsBrokerService.pushBack(nextJob); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e); + } + } + + protected Job executeJobAndGetNext(Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType()); + + NextCommand nextCommand = executeCommandAndGetNext(job); + + Job nextJob = setNextCommandInJob(nextCommand, job); + + return nextJob; + } + + private NextCommand executeCommandAndGetNext(Job job) { + NextCommand nextCommand; + try { + final JobCommand jobCommand = jobCommandFactory.toCommand(job); + nextCommand = jobCommand.call(); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e); + nextCommand = new NextCommand(FAILED); + } + + if (nextCommand == null) { + nextCommand = new NextCommand(STOPPED); + } + return nextCommand; + } + + private Job setNextCommandInJob(NextCommand nextCommand, Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType(), + nextCommand.getStatus(), + nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : ""); + + job.setStatus(nextCommand.getStatus()); + + if (nextCommand.getCommand() != null) { + job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData()); + } + + return job; + } + + private boolean isMsoNewApiActive() { + return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION); + } + + + //used by quartz to inject JobsBrokerService into the job + //see JobSchedulerInitializer + public void setJobsBrokerService(JobsBrokerService jobsBrokerService) { + this.jobsBrokerService = jobsBrokerService; + } + + public void setFeatureManager(FeatureManager featureManager) { + this.featureManager = featureManager; + } + + public void setJobCommandFactory(JobCommandFactory jobCommandFactory) { + this.jobCommandFactory = jobCommandFactory; + } + + public void setTopic(Job.JobStatus topic) { + this.topic = topic; + } +} |