From c72d565bb58226b20625b2bce5f0019046bee649 Mon Sep 17 00:00:00 2001 From: "Sonsino, Ofir (os0695)" Date: Tue, 10 Jul 2018 14:20:54 +0300 Subject: Merge 1806 code of vid-common Change-Id: I75d52abed4a24dfe3827d79edc4a2938726aa87a Issue-ID: VID-208 Signed-off-by: Sonsino, Ofir (os0695) --- .../java/org/onap/vid/job/impl/JobAdapterImpl.java | 72 +++++++ .../java/org/onap/vid/job/impl/JobDaoImpl.java | 192 +++++++++++++++++ .../onap/vid/job/impl/JobSchedulerInitializer.java | 76 +++++++ .../main/java/org/onap/vid/job/impl/JobWorker.java | 135 ++++++++++++ .../job/impl/JobsBrokerServiceInDatabaseImpl.java | 236 +++++++++++++++++++++ 5 files changed, 711 insertions(+) create mode 100644 vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java create mode 100644 vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java create mode 100644 vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java create mode 100644 vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java create mode 100644 vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl') diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java new file mode 100644 index 00000000..77e1dd2c --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java @@ -0,0 +1,72 @@ +package org.onap.vid.job.impl; + +import com.google.common.collect.ImmutableMap; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobAdapter; +import org.onap.vid.job.JobType; +import org.onap.vid.model.JobBulk; +import org.onap.vid.model.JobModel; +import org.springframework.stereotype.Component; + +import javax.ws.rs.BadRequestException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +@Component +public class JobAdapterImpl implements JobAdapter { + + @Override + public JobModel toModel(Job job) { + JobModel jobModel = new JobModel(); + jobModel.setUuid(job.getUuid()); + jobModel.setStatus(job.getStatus()); + jobModel.setTemplateId(job.getTemplateId()); + return jobModel; + } + + @Override + public JobBulk toModelBulk(List jobList) { + return new JobBulk(jobList.stream().map(this::toModel).collect(Collectors.toList())); + } + + @Override + public Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk){ + JobDaoImpl job = new JobDaoImpl(); + job.setStatus(Job.JobStatus.PENDING); + job.setTypeAndData(jobType, ImmutableMap.of( + "request", request, + "userId", userId)); + job.setTemplateId(templateId); + job.setIndexInBulk(indexInBulk); + job.setUserId(userId); + return job; + } + + @Override + public List createBulkOfJobs(Map bulkRequest) { + int count; + JobType jobType; + + try { + count = (Integer) bulkRequest.get("count"); + jobType = JobType.valueOf((String) bulkRequest.get("type")); + } catch (Exception exception) { + throw new BadRequestException(exception); + } + List jobList = new ArrayList<>(count + 1); + UUID templateId = UUID.randomUUID(); + for (int i = 0; i < count; i++) { + Job child = new JobDaoImpl(); + child.setTypeAndData(jobType, bulkRequest); + child.setStatus(Job.JobStatus.PENDING); + child.setTemplateId(templateId); + child.setIndexInBulk(i); + jobList.add(child); + } + return jobList; + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java new file mode 100644 index 00000000..1ff1296c --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java @@ -0,0 +1,192 @@ +package org.onap.vid.job.impl; + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.MoreObjects; +import org.hibernate.annotations.Type; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobType; +import org.onap.vid.model.VidBaseEntity; + +import javax.persistence.*; +import java.io.IOException; +import java.util.*; + +@Entity +@Table(name = "vid_job") +public class JobDaoImpl extends VidBaseEntity implements Job { + + private static ObjectMapper objectMapper = new ObjectMapper(); + private Job.JobStatus status; + private JobType type; + private Map> data = new TreeMap<>(); + private UUID templateId; + private UUID uuid; + private String takenBy; + private String userId; + private Integer age = 0; + private Integer indexInBulk = 0; + private Date deletedAt; + + @Id + @Column(name = "JOB_ID", columnDefinition = "CHAR(36)") + @Type(type="org.hibernate.type.UUIDCharType") + public UUID getUuid() { + return uuid; + } + + public void setUuid(UUID uuid) { + this.uuid = uuid; + } + + //we use uuid instead id. So making id Transient + @Override + @Transient + @JsonIgnore + public Long getId() { + return this.getUuid().getLeastSignificantBits(); + } + + @Column(name = "JOB_STATUS") + @Enumerated(EnumType.STRING) + public Job.JobStatus getStatus() { + return status; + } + + public void setStatus(Job.JobStatus status) { + this.status = status; + } + + @Enumerated(EnumType.STRING) + @Column(name = "JOB_TYPE") + public JobType getType() { + return type; + } + + public void setType(JobType type) { + this.type = type; + } + + //the columnDefinition is relevant only for UT + @Column(name = "JOB_DATA", columnDefinition = "VARCHAR(30000)") + public String getDataRaw() { + try { + return objectMapper.writeValueAsString(data); + } catch (JsonProcessingException e) { + throw new GenericUncheckedException(e); + } + } + + public void setDataRaw(String data) { + try { + this.data = objectMapper.readValue(data, new TypeReference>>() { + }); + } catch (IOException e) { + throw new GenericUncheckedException(e); + } + } + + @Transient + public Map getData() { + return data.get(getType()); + } + + @Override + public void setTypeAndData(JobType jobType, Map data) { + // *add* the data to map, + // then change state to given type + this.type = jobType; + this.data.put(jobType, data); + } + + @Column(name = "TAKEN_BY") + public String getTakenBy() { + return takenBy; + } + + public void setTakenBy(String takenBy) { + this.takenBy = takenBy; + } + + @Type(type="org.hibernate.type.UUIDCharType") + @Column(name = "TEMPLATE_ID", columnDefinition = "CHAR(36)") + public UUID getTemplateId() { + return templateId; + } + + @Override + public void setTemplateId(UUID templateId) { + this.templateId = templateId; + } + + @Column(name="INDEX_IN_BULK") + public Integer getIndexInBulk() { + return indexInBulk; + } + + @Override + public void setIndexInBulk(Integer indexInBulk) { + this.indexInBulk = indexInBulk; + } + + @Column(name="USER_ID") + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + @Column(name="AGE") + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } + + @Column(name="DELETED_AT") + public Date getDeletedAt() { + return deletedAt; + } + + public void setDeletedAt(Date deletedAt) { + this.deletedAt = deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof JobDaoImpl)) return false; + JobDaoImpl daoJob = (JobDaoImpl) o; + return Objects.equals(getUuid(), daoJob.getUuid()); + } + + @Override + public int hashCode() { + return Objects.hash(getUuid()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("type", type) + .add("templateId", templateId) + .add("uuid", uuid) + .add("takenBy", takenBy) + .add("userId", userId) + .add("age", age) + .add("created", created) + .add("modified", modified) + .add("deletedAt", deletedAt) + .add("data", data) + .toString(); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java new file mode 100644 index 00000000..a5070fbd --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java @@ -0,0 +1,76 @@ +package org.onap.vid.job.impl; + +import com.google.common.collect.ImmutableMap; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.command.JobCommandFactory; +import org.onap.vid.properties.Features; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Component; +import org.togglz.core.manager.FeatureManager; + +import javax.annotation.PostConstruct; + +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; + +@Component +public class JobSchedulerInitializer { + + private JobsBrokerService jobsBrokerService; + private SchedulerFactoryBean schedulerFactoryBean; + private FeatureManager featureManager; + private JobCommandFactory jobCommandFactory; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobSchedulerInitializer.class); + + @Autowired + public JobSchedulerInitializer( + JobsBrokerService jobsBrokerService, + SchedulerFactoryBean schedulerFactoryBean, + FeatureManager featureManager, + JobCommandFactory JobCommandFactory + ) { + this.jobsBrokerService = jobsBrokerService; + this.schedulerFactoryBean = schedulerFactoryBean; + this.featureManager = featureManager; + this.jobCommandFactory = JobCommandFactory; + + } + + @PostConstruct + public void init() { + if (!featureManager.isActive(Features.FLAG_ASYNC_JOBS)) { + return; + } + scheduleJobWorker(Job.JobStatus.PENDING, 1); + scheduleJobWorker(Job.JobStatus.IN_PROGRESS, 1); + } + + private void scheduleJobWorker(Job.JobStatus topic, int intervalInSeconds) { + Scheduler scheduler = schedulerFactoryBean.getScheduler(); + JobDetail jobDetail = JobBuilder.newJob().ofType(JobWorker.class) + .withIdentity("AsyncWorkersJob" + topic) + .withDescription("Job that run async worker for " + topic) + .setJobData(new JobDataMap(ImmutableMap.of( + "jobsBrokerService", jobsBrokerService, + "jobCommandFactory", jobCommandFactory, + "featureManager", featureManager, + "topic", topic + ))) + .build(); + Trigger asyncWorkerTrigger = TriggerBuilder.newTrigger().forJob(jobDetail) + .withIdentity("AsyncWorkersTrigger" + topic) + .withDescription("Trigger to run async worker for " + topic) + .withSchedule(simpleSchedule().repeatForever().withIntervalInSeconds(intervalInSeconds)) + .build(); + try { + scheduler.scheduleJob(jobDetail, asyncWorkerTrigger); + } catch (SchedulerException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Failed to schedule trigger for async worker jobs: {}", e.getMessage()); + throw new GenericUncheckedException(e); + } + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java new file mode 100644 index 00000000..aa94a2aa --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java @@ -0,0 +1,135 @@ +package org.onap.vid.job.impl; + +import org.apache.commons.lang3.StringUtils; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.NextCommand; +import org.onap.vid.job.command.JobCommandFactory; +import org.onap.vid.properties.Features; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.quartz.JobExecutionContext; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.stereotype.Component; +import org.togglz.core.manager.FeatureManager; + +import java.util.Optional; +import java.util.UUID; + +import static org.onap.vid.job.Job.JobStatus.FAILED; +import static org.onap.vid.job.Job.JobStatus.STOPPED; + +@Component +public class JobWorker extends QuartzJobBean { + + private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class); + + private JobsBrokerService jobsBrokerService; + private FeatureManager featureManager; + private JobCommandFactory jobCommandFactory; + private Job.JobStatus topic; + + @Override + protected void executeInternal(JobExecutionContext context) { + Optional job; + + if (!isMsoNewApiActive()) { + return; + } + + job = pullJob(); + + while (job.isPresent()) { + Job nextJob = executeJobAndGetNext(job.get()); + pushBack(nextJob); + + job = pullJob(); + } + } + + private Optional pullJob() { + try { + return jobsBrokerService.pull(topic, UUID.randomUUID().toString()); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e); + return Optional.empty(); + } + } + + private void pushBack(Job nextJob) { + try { + jobsBrokerService.pushBack(nextJob); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e); + } + } + + protected Job executeJobAndGetNext(Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType()); + + NextCommand nextCommand = executeCommandAndGetNext(job); + + Job nextJob = setNextCommandInJob(nextCommand, job); + + return nextJob; + } + + private NextCommand executeCommandAndGetNext(Job job) { + NextCommand nextCommand; + try { + final JobCommand jobCommand = jobCommandFactory.toCommand(job); + nextCommand = jobCommand.call(); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e); + nextCommand = new NextCommand(FAILED); + } + + if (nextCommand == null) { + nextCommand = new NextCommand(STOPPED); + } + return nextCommand; + } + + private Job setNextCommandInJob(NextCommand nextCommand, Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType(), + nextCommand.getStatus(), + nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : ""); + + job.setStatus(nextCommand.getStatus()); + + if (nextCommand.getCommand() != null) { + job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData()); + } + + return job; + } + + private boolean isMsoNewApiActive() { + return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION); + } + + + //used by quartz to inject JobsBrokerService into the job + //see JobSchedulerInitializer + public void setJobsBrokerService(JobsBrokerService jobsBrokerService) { + this.jobsBrokerService = jobsBrokerService; + } + + public void setFeatureManager(FeatureManager featureManager) { + this.featureManager = featureManager; + } + + public void setJobCommandFactory(JobCommandFactory jobCommandFactory) { + this.jobCommandFactory = jobCommandFactory; + } + + public void setTopic(Job.JobStatus topic) { + this.topic = topic; + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java new file mode 100644 index 00000000..e286cc4a --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java @@ -0,0 +1,236 @@ +package org.onap.vid.job.impl; + +import org.apache.commons.lang3.StringUtils; +import org.hibernate.SessionFactory; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.exceptions.OperationNotAllowedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.properties.VidProperties; +import org.onap.vid.utils.DaoUtils; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.onap.portalsdk.core.service.DataAccessService; +import org.onap.portalsdk.core.util.SystemProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.*; + +@Service +public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { + + static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class); + + private final DataAccessService dataAccessService; + + private final SessionFactory sessionFactory; + private int maxOpenedInstantiationRequestsToMso; + private int pollingIntervalSeconds; + + @Autowired + public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory, + @Value("0") int maxOpenedInstantiationRequestsToMso, + @Value("10") int pollingIntervalSeconds) { + // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration + this.dataAccessService = dataAccessService; + this.sessionFactory = sessionFactory; + this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso; + this.pollingIntervalSeconds = pollingIntervalSeconds; + } + + @PostConstruct + public void configure() { + maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS)); + pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS)); + } + + public void deleteAll() { + dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null); + } + + @Override + public UUID add(Job job) { + final JobDaoImpl jobDao = castToJobDaoImpl(job); + jobDao.setUuid(UUID.randomUUID()); + dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); + return job.getUuid(); + } + + @Override + public Optional pull(Job.JobStatus topic, String ownerId) { + JobDaoImpl daoJob; + int updatedEntities; + do { + String query = sqlQueryForTopic(topic); + List jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null); + if (jobs.isEmpty()) { + return Optional.empty(); + } + + daoJob = jobs.get(0); + + final UUID uuid = daoJob.getUuid(); + final Integer age = daoJob.getAge(); + + daoJob.setTakenBy(ownerId); + + // It might become that daoJob was taken and pushed-back already, before we + // arrived here, so we're verifying the age was not pushed forward. + // Age is actually forwarded upon pushBack(). + String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " + + " job.id = :id" + + " and job.age = :age" + + " and takenBy is null"; + updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> + session.createQuery(hqlUpdate) + .setText("id", uuid.toString()) + .setInteger("age", age) + .setText("takenBy", ownerId) + .executeUpdate()); + + } while (updatedEntities == 0); + + return Optional.ofNullable(daoJob); + } + + private java.sql.Timestamp nowMinusInterval() { + return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds)); + } + + private String sqlQueryForTopic(Job.JobStatus topic) { + switch (topic) { + case IN_PROGRESS: + return "" + + "select * from VID_JOB" + + " where" + + // select only non-deleted in-progress jobs + " JOB_STATUS = 'IN_PROGRESS'" + + " and TAKEN_BY is null" + + " and DELETED_AT is null" + + // give some breath, don't select jos that were recently reached + " and MODIFIED_DATE <= '" + nowMinusInterval() + + // take the oldest handled one + "' order by MODIFIED_DATE ASC" + + // select only one result + " limit 1"; + + case PENDING: + return "" + + // select only pending jobs + "select vid_job.* from VID_JOB " + + // select users have in_progress jobs + "left join \n" + + " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" + + "group by user_id) users_have_any_in_progress_job_tbl\n" + + "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " + + "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" + + // job is not deleted + " AND DELETED_AT is null and (\n" + + // limit in-progress to some amount + "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" + + "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " + + // don't take jobs from templates that already in-progress/failed + "and TEMPLATE_Id not in \n" + + "(select TEMPLATE_Id from vid_job where" + + " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted + " or JOB_STATUS='IN_PROGRESS'" + + " or TAKEN_BY IS NOT NULL)" + " \n " + + // prefer older jobs, but the earlier in each bulk + "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " + + // select only one result + "limit 1"; + default: + throw new GenericUncheckedException("Unsupported topic to pull from: " + topic); + } + } + + + private byte[] getUuidAsByteArray(UUID owner) { + ByteBuffer bb = ByteBuffer.wrap(new byte[16]); + bb.putLong(owner.getMostSignificantBits()); + bb.putLong(owner.getLeastSignificantBits()); + return bb.array(); + } + + @Override + public void pushBack(Job job) { + final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null); + + if (remoteDaoJob == null) { + throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()"); + } + + if (remoteDaoJob.getTakenBy() == null) { + throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back."); + } + + final JobDaoImpl jobDao = castToJobDaoImpl(job); + + jobDao.setTakenBy(null); + + Integer age = jobDao.getAge(); + jobDao.setAge(age + 1); + + logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType()); + + dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); + } + + private JobDaoImpl castToJobDaoImpl(Job job) { + if (!(job instanceof JobDaoImpl)) { + throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass()); + } + return (JobDaoImpl) job; + } + + @Override + public Collection peek() { + return dataAccessService.getList(JobDaoImpl.class, null); + } + + @Override + public Job peek(UUID jobId) { + return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null); + } + + @Override + public void delete(UUID jobId) { + int updatedEntities; + Date now = new Date(); + + String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " + + " job.id = :id" + + " and job.status in(:pending, :stopped)" + + " and takenBy is null"; + + updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> + session.createQuery(hqlUpdate) + .setTimestamp("now", now) + .setText("id", jobId.toString()) + .setText("pending", Job.JobStatus.PENDING.toString()) + .setText("stopped", Job.JobStatus.STOPPED.toString()) + .executeUpdate()); + + if (updatedEntities == 0) { + final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null); + + if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) { + logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId); + throw new OperationNotAllowedException("Service does not exist"); + } + + if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) { + logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() + + ", takenBy " + remoteDaoJob.getTakenBy()); + throw new OperationNotAllowedException("Service status does not allow deletion from the queue"); + } + + throw new OperationNotAllowedException("Service deletion failed"); + } + } +} -- cgit 1.2.3-korg