diff options
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl')
7 files changed, 273 insertions, 74 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java index 59f12f4c..1050ab93 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java @@ -4,16 +4,11 @@ import com.google.common.collect.ImmutableMap; import org.onap.vid.job.Job; import org.onap.vid.job.JobAdapter; import org.onap.vid.job.JobType; -import org.onap.vid.model.JobBulk; import org.onap.vid.model.JobModel; import org.springframework.stereotype.Component; -import javax.ws.rs.BadRequestException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; @Component public class JobAdapterImpl implements JobAdapter { @@ -28,44 +23,32 @@ public class JobAdapterImpl implements JobAdapter { } @Override - public JobBulk toModelBulk(List<Job> jobList) { - return new JobBulk(jobList.stream().map(this::toModel).collect(Collectors.toList())); - } - - @Override - public Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk) { - JobDaoImpl job = createJob(jobType, templateId, indexInBulk, ImmutableMap.of( - "request", request, - "userId", userId)); - job.setUserId(userId); + public Job createServiceInstantiationJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, String optimisticUniqueServiceInstanceName, Integer indexInBulk){ + JobDaoImpl job = createJob(jobType, Job.JobStatus.PENDING , userId); + job.setSharedData(new JobSharedData(job.getUuid(), userId, request)); + job.setTypeAndData(jobType, ImmutableMap.of( + "optimisticUniqueServiceInstanceName", optimisticUniqueServiceInstanceName + )); + job.setTemplateId(templateId); + job.setIndexInBulk(indexInBulk); return job; } @Override - public List<Job> createBulkOfJobs(Map<String, Object> bulkRequest) { - int count; - JobType jobType; - - try { - count = (Integer) bulkRequest.get("count"); - jobType = JobType.valueOf((String) bulkRequest.get("type")); - } catch (Exception exception) { - throw new BadRequestException(exception); - } - List<Job> jobList = new ArrayList<>(count + 1); - UUID templateId = UUID.randomUUID(); - for (int i = 0; i < count; i++) { - jobList.add(createJob(jobType, templateId, i, bulkRequest)); - } - return jobList; + public Job createChildJob(JobType jobType, Job.JobStatus jobStatus, AsyncJobRequest request, JobSharedData parentSharedData, Map<String, Object> jobData) { + JobDaoImpl job = createJob(jobType, jobStatus , parentSharedData.getUserId()); + job.setSharedData(new JobSharedData(job.getUuid(), request, parentSharedData)); + job.setTypeAndData(jobType, jobData); + return job; } - private JobDaoImpl createJob(JobType jobType, UUID templateId, Integer indexInBulk, Map<String, Object> data) { + protected JobDaoImpl createJob(JobType jobType, Job.JobStatus jobStatus, String userId) { JobDaoImpl job = new JobDaoImpl(); - job.setStatus(Job.JobStatus.PENDING); - job.setTypeAndData(jobType, data); - job.setTemplateId(templateId); - job.setIndexInBulk(indexInBulk); + job.setTypeAndData(jobType, null); + job.setStatus(jobStatus); + job.setUuid(UUID.randomUUID()); + job.setUserId(userId); return job; } + } diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java index 1ff1296c..b301a2fb 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java @@ -3,19 +3,31 @@ package org.onap.vid.job.impl; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.MoreObjects; +import org.hibernate.annotations.DynamicUpdate; +import org.hibernate.annotations.SelectBeforeUpdate; import org.hibernate.annotations.Type; import org.onap.vid.exceptions.GenericUncheckedException; import org.onap.vid.job.Job; +import org.onap.vid.job.JobException; import org.onap.vid.job.JobType; import org.onap.vid.model.VidBaseEntity; import javax.persistence.*; import java.io.IOException; -import java.util.*; - +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/* + The following 2 annotations let hibernate to update only fields that actually have been changed. + DynamicUpdate tell hibernate to update only dirty fields. + SelectBeforeUpdate is needed since during update the entity is detached (get and update are in different sessions) +*/ +@DynamicUpdate() +@SelectBeforeUpdate() @Entity @Table(name = "vid_job") public class JobDaoImpl extends VidBaseEntity implements Job { @@ -23,7 +35,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job { private static ObjectMapper objectMapper = new ObjectMapper(); private Job.JobStatus status; private JobType type; - private Map<JobType, Map<String, Object>> data = new TreeMap<>(); + private JobData data = new JobData(); private UUID templateId; private UUID uuid; private String takenBy; @@ -83,16 +95,25 @@ public class JobDaoImpl extends VidBaseEntity implements Job { public void setDataRaw(String data) { try { - this.data = objectMapper.readValue(data, new TypeReference<Map<JobType, Map<String, Object>>>() { - }); + this.data = objectMapper.readValue(data, JobData.class); } catch (IOException e) { - throw new GenericUncheckedException(e); + throw new JobException("Error parsing job's data", uuid, e); } } @Transient public Map<String, Object> getData() { - return data.get(getType()); + return data.getCommandData().get(getType()); + } + + public void setSharedData(JobSharedData sharedData) { + this.data.setSharedData(sharedData); + } + + @Override + @Transient + public JobSharedData getSharedData() { + return this.data.getSharedData(); } @Override @@ -100,7 +121,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job { // *add* the data to map, // then change state to given type this.type = jobType; - this.data.put(jobType, data); + this.data.getCommandData().put(jobType, data); } @Column(name = "TAKEN_BY") @@ -123,6 +144,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job { this.templateId = templateId; } + @Override @Column(name="INDEX_IN_BULK") public Integer getIndexInBulk() { return indexInBulk; diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java new file mode 100644 index 00000000..0b7a6b7c --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java @@ -0,0 +1,54 @@ +package org.onap.vid.job.impl; + +import org.onap.vid.job.JobType; + +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +public class JobData { + + private TreeMap<JobType, Map<String, Object>> commandData; + private JobSharedData sharedData; + + public JobData() { + commandData = new TreeMap<>(); + sharedData = new JobSharedData(); + } + + public JobData(TreeMap<JobType, Map<String, Object>> commandData, JobSharedData sharedData) { + this.commandData = commandData; + this.sharedData = sharedData; + } + + public TreeMap<JobType, Map<String, Object>> getCommandData() { + return commandData; + } + + public void setCommandData(TreeMap<JobType, Map<String, Object>> commandData) { + this.commandData = commandData; + } + + public JobSharedData getSharedData() { + return sharedData; + } + + public void setSharedData(JobSharedData sharedData) { + this.sharedData = sharedData; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof JobData)) return false; + JobData jobData = (JobData) o; + return Objects.equals(getCommandData(), jobData.getCommandData()) && + Objects.equals(getSharedData(), jobData.getSharedData()); + } + + @Override + public int hashCode() { + + return Objects.hash(getCommandData(), getSharedData()); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java index a5070fbd..59b2f250 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java @@ -1,12 +1,12 @@ package org.onap.vid.job.impl; import com.google.common.collect.ImmutableMap; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; import org.onap.vid.exceptions.GenericUncheckedException; import org.onap.vid.job.Job; import org.onap.vid.job.JobsBrokerService; import org.onap.vid.job.command.JobCommandFactory; import org.onap.vid.properties.Features; -import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; @@ -46,7 +46,9 @@ public class JobSchedulerInitializer { return; } scheduleJobWorker(Job.JobStatus.PENDING, 1); + scheduleJobWorker(Job.JobStatus.CREATING, 1); scheduleJobWorker(Job.JobStatus.IN_PROGRESS, 1); + scheduleJobWorker(Job.JobStatus.RESOURCE_IN_PROGRESS, 1); } private void scheduleJobWorker(Job.JobStatus topic, int intervalInSeconds) { diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java new file mode 100644 index 00000000..8f1e4573 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java @@ -0,0 +1,84 @@ +package org.onap.vid.job.impl; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.onap.vid.job.JobAdapter; + +import java.util.Objects; +import java.util.UUID; + +public class JobSharedData { + + protected UUID jobUuid; + protected String userId; + protected Class requestType; + protected UUID rootJobId; + + @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, property="class") + protected JobAdapter.AsyncJobRequest request; + + public JobSharedData() { + } + + public JobSharedData(UUID jobUuid, String userId, JobAdapter.AsyncJobRequest request) { + this.jobUuid = jobUuid; + this.userId = userId; + this.requestType = request.getClass(); + this.request = request; + this.rootJobId = jobUuid; + } + + public JobSharedData(UUID jobUuid, JobAdapter.AsyncJobRequest request, JobSharedData parentData) { + this(jobUuid, parentData.getUserId(), request); + rootJobId = parentData.getRootJobId() != null ? parentData.getRootJobId() : parentData.getJobUuid(); + } + + + public UUID getJobUuid() { + return jobUuid; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public Class getRequestType() { + return requestType; + } + + public void setRequestType(Class requestType) { + this.requestType = requestType; + } + + public JobAdapter.AsyncJobRequest getRequest() { + return request; + } + + public void setRequest(JobAdapter.AsyncJobRequest request) { + this.request = request; + } + + public UUID getRootJobId() { + return rootJobId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof JobSharedData)) return false; + JobSharedData that = (JobSharedData) o; + return Objects.equals(getJobUuid(), that.getJobUuid()) && + Objects.equals(getUserId(), that.getUserId()) && + Objects.equals(getRequestType(), that.getRequestType()) && + Objects.equals(getRootJobId(), that.getRootJobId()) && + Objects.equals(getRequest(), that.getRequest()); + } + + @Override + public int hashCode() { + return Objects.hash(getJobUuid(), getUserId(), getRequestType(), getRootJobId(), getRequest()); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java index aa94a2aa..36ec4314 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java @@ -1,13 +1,11 @@ package org.onap.vid.job.impl; import org.apache.commons.lang3.StringUtils; -import org.onap.vid.job.Job; -import org.onap.vid.job.JobCommand; -import org.onap.vid.job.JobsBrokerService; -import org.onap.vid.job.NextCommand; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.onap.vid.job.*; import org.onap.vid.job.command.JobCommandFactory; import org.onap.vid.properties.Features; -import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; import org.quartz.JobExecutionContext; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; @@ -52,6 +50,8 @@ public class JobWorker extends QuartzJobBean { return jobsBrokerService.pull(topic, UUID.randomUUID().toString()); } catch (Exception e) { LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e); + tryMutingJobFromException(e); + return Optional.empty(); } } @@ -72,9 +72,7 @@ public class JobWorker extends QuartzJobBean { NextCommand nextCommand = executeCommandAndGetNext(job); - Job nextJob = setNextCommandInJob(nextCommand, job); - - return nextJob; + return setNextCommandInJob(nextCommand, job); } private NextCommand executeCommandAndGetNext(Job job) { @@ -83,7 +81,7 @@ public class JobWorker extends QuartzJobBean { final JobCommand jobCommand = jobCommandFactory.toCommand(job); nextCommand = jobCommand.call(); } catch (Exception e) { - LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e); + LOGGER.error("error while executing job from queue: {}", e); nextCommand = new NextCommand(FAILED); } @@ -114,6 +112,25 @@ public class JobWorker extends QuartzJobBean { return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION); } + private void tryMutingJobFromException(Exception e) { + // If there's JobException in the stack, read job uuid from + // the exception, and mute it in DB. + final int indexOfJobException = + ExceptionUtils.indexOfThrowable(e, JobException.class); + + if (indexOfJobException >= 0) { + try { + final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException); + LOGGER.info(EELFLoggerDelegate.debugLogger, "muting job: {} ({})", jobException.getJobUuid(), jobException.toString()); + final boolean success = jobsBrokerService.mute(jobException.getJobUuid()); + if (!success) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job {}", jobException.getJobUuid()); + } + } catch (Exception e1) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job: {}", e1, e1); + } + } + } //used by quartz to inject JobsBrokerService into the job //see JobSchedulerInitializer diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java index e286cc4a..e8747879 100644 --- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java @@ -2,15 +2,15 @@ package org.onap.vid.job.impl; import org.apache.commons.lang3.StringUtils; import org.hibernate.SessionFactory; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.onap.portalsdk.core.service.DataAccessService; +import org.onap.portalsdk.core.util.SystemProperties; import org.onap.vid.exceptions.GenericUncheckedException; import org.onap.vid.exceptions.OperationNotAllowedException; import org.onap.vid.job.Job; import org.onap.vid.job.JobsBrokerService; import org.onap.vid.properties.VidProperties; import org.onap.vid.utils.DaoUtils; -import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; -import org.onap.portalsdk.core.service.DataAccessService; -import org.onap.portalsdk.core.util.SystemProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -21,6 +21,8 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; +import static org.onap.vid.job.Job.JobStatus.CREATING; + @Service public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { @@ -56,7 +58,6 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { @Override public UUID add(Job job) { final JobDaoImpl jobDao = castToJobDaoImpl(job); - jobDao.setUuid(UUID.randomUUID()); dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); return job.getUuid(); } @@ -102,23 +103,30 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds)); } + private String selectQueryByJobStatus(Job.JobStatus topic){ + + String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'"); + return "" + + "select * from VID_JOB" + + " where" + + // select only non-deleted in-progress jobs + " JOB_STATUS = '" + topic + "'" + + " and TAKEN_BY is null" + + " and DELETED_AT is null" + + // give some breath, don't select jos that were recently reached + intervalCondition + + // take the oldest handled one + " order by MODIFIED_DATE ASC" + + // select only one result + " limit 1"; + } + private String sqlQueryForTopic(Job.JobStatus topic) { switch (topic) { case IN_PROGRESS: - return "" + - "select * from VID_JOB" + - " where" + - // select only non-deleted in-progress jobs - " JOB_STATUS = 'IN_PROGRESS'" + - " and TAKEN_BY is null" + - " and DELETED_AT is null" + - // give some breath, don't select jos that were recently reached - " and MODIFIED_DATE <= '" + nowMinusInterval() + - // take the oldest handled one - "' order by MODIFIED_DATE ASC" + - // select only one result - " limit 1"; - + case RESOURCE_IN_PROGRESS: + case CREATING: + return selectQueryByJobStatus(topic); case PENDING: return "" + // select only pending jobs @@ -137,9 +145,10 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { // don't take jobs from templates that already in-progress/failed "and TEMPLATE_Id not in \n" + "(select TEMPLATE_Id from vid_job where" + + " TEMPLATE_Id IS NOT NULL and("+ " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted " or JOB_STATUS='IN_PROGRESS'" + - " or TAKEN_BY IS NOT NULL)" + " \n " + + " or TAKEN_BY IS NOT NULL))" + " \n " + // prefer older jobs, but the earlier in each bulk "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " + // select only one result @@ -233,4 +242,32 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { throw new OperationNotAllowedException("Service deletion failed"); } } + + @Override + public boolean mute(UUID jobId) { + if (jobId == null) { + return false; + } + + final String prefix = "DUMP"; + int updatedEntities; + + // Changing the topic (i.e. `job.status`) makes the job non-fetchable. + String hqlUpdate = "" + + "update JobDaoImpl job set" + + " job.status = concat('" + prefix + "_', job.status)," + + // empty `takenBy`, because some logics treat taken as in-progress + " takenBy = null" + + " where " + + " job.id = :id" + + // if prefix already on the topic -- no need to do it twice. + " and job.status NOT LIKE '" + prefix + "\\_%'"; + + updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> + session.createQuery(hqlUpdate) + .setText("id", jobId.toString()) + .executeUpdate()); + + return updatedEntities != 0; + } } |