aboutsummaryrefslogtreecommitdiffstats
path: root/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
diff options
context:
space:
mode:
authorSonsino, Ofir (os0695) <os0695@intl.att.com>2018-07-10 14:20:54 +0300
committerSonsino, Ofir (os0695) <os0695@intl.att.com>2018-07-10 14:20:54 +0300
commitc72d565bb58226b20625b2bce5f0019046bee649 (patch)
tree8658e49595705b02e47ddc14afa20d6bb7123547 /vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
parentef8a6b47847012fd59ea20da21d8d3d7c4a301ed (diff)
Merge 1806 code of vid-common
Change-Id: I75d52abed4a24dfe3827d79edc4a2938726aa87a Issue-ID: VID-208 Signed-off-by: Sonsino, Ofir (os0695) <os0695@intl.att.com>
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java')
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java236
1 files changed, 236 insertions, 0 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
new file mode 100644
index 000000000..e286cc4aa
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
@@ -0,0 +1,236 @@
+package org.onap.vid.job.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.hibernate.SessionFactory;
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.exceptions.OperationNotAllowedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobsBrokerService;
+import org.onap.vid.properties.VidProperties;
+import org.onap.vid.utils.DaoUtils;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.onap.portalsdk.core.service.DataAccessService;
+import org.onap.portalsdk.core.util.SystemProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.*;
+
+@Service
+public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
+
+ static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
+
+ private final DataAccessService dataAccessService;
+
+ private final SessionFactory sessionFactory;
+ private int maxOpenedInstantiationRequestsToMso;
+ private int pollingIntervalSeconds;
+
+ @Autowired
+ public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
+ @Value("0") int maxOpenedInstantiationRequestsToMso,
+ @Value("10") int pollingIntervalSeconds) {
+ // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
+ this.dataAccessService = dataAccessService;
+ this.sessionFactory = sessionFactory;
+ this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
+ this.pollingIntervalSeconds = pollingIntervalSeconds;
+ }
+
+ @PostConstruct
+ public void configure() {
+ maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
+ pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
+ }
+
+ public void deleteAll() {
+ dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
+ }
+
+ @Override
+ public UUID add(Job job) {
+ final JobDaoImpl jobDao = castToJobDaoImpl(job);
+ jobDao.setUuid(UUID.randomUUID());
+ dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
+ return job.getUuid();
+ }
+
+ @Override
+ public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
+ JobDaoImpl daoJob;
+ int updatedEntities;
+ do {
+ String query = sqlQueryForTopic(topic);
+ List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
+ if (jobs.isEmpty()) {
+ return Optional.empty();
+ }
+
+ daoJob = jobs.get(0);
+
+ final UUID uuid = daoJob.getUuid();
+ final Integer age = daoJob.getAge();
+
+ daoJob.setTakenBy(ownerId);
+
+ // It might become that daoJob was taken and pushed-back already, before we
+ // arrived here, so we're verifying the age was not pushed forward.
+ // Age is actually forwarded upon pushBack().
+ String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
+ " job.id = :id" +
+ " and job.age = :age" +
+ " and takenBy is null";
+ updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
+ session.createQuery(hqlUpdate)
+ .setText("id", uuid.toString())
+ .setInteger("age", age)
+ .setText("takenBy", ownerId)
+ .executeUpdate());
+
+ } while (updatedEntities == 0);
+
+ return Optional.ofNullable(daoJob);
+ }
+
+ private java.sql.Timestamp nowMinusInterval() {
+ return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
+ }
+
+ private String sqlQueryForTopic(Job.JobStatus topic) {
+ switch (topic) {
+ case IN_PROGRESS:
+ return "" +
+ "select * from VID_JOB" +
+ " where" +
+ // select only non-deleted in-progress jobs
+ " JOB_STATUS = 'IN_PROGRESS'" +
+ " and TAKEN_BY is null" +
+ " and DELETED_AT is null" +
+ // give some breath, don't select jos that were recently reached
+ " and MODIFIED_DATE <= '" + nowMinusInterval() +
+ // take the oldest handled one
+ "' order by MODIFIED_DATE ASC" +
+ // select only one result
+ " limit 1";
+
+ case PENDING:
+ return "" +
+ // select only pending jobs
+ "select vid_job.* from VID_JOB " +
+ // select users have in_progress jobs
+ "left join \n" +
+ " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
+ "group by user_id) users_have_any_in_progress_job_tbl\n" +
+ "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
+ "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
+ // job is not deleted
+ " AND DELETED_AT is null and (\n" +
+ // limit in-progress to some amount
+ "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
+ "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
+ // don't take jobs from templates that already in-progress/failed
+ "and TEMPLATE_Id not in \n" +
+ "(select TEMPLATE_Id from vid_job where" +
+ " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
+ " or JOB_STATUS='IN_PROGRESS'" +
+ " or TAKEN_BY IS NOT NULL)" + " \n " +
+ // prefer older jobs, but the earlier in each bulk
+ "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
+ // select only one result
+ "limit 1";
+ default:
+ throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
+ }
+ }
+
+
+ private byte[] getUuidAsByteArray(UUID owner) {
+ ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+ bb.putLong(owner.getMostSignificantBits());
+ bb.putLong(owner.getLeastSignificantBits());
+ return bb.array();
+ }
+
+ @Override
+ public void pushBack(Job job) {
+ final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
+
+ if (remoteDaoJob == null) {
+ throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
+ }
+
+ if (remoteDaoJob.getTakenBy() == null) {
+ throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
+ }
+
+ final JobDaoImpl jobDao = castToJobDaoImpl(job);
+
+ jobDao.setTakenBy(null);
+
+ Integer age = jobDao.getAge();
+ jobDao.setAge(age + 1);
+
+ logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
+
+ dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
+ }
+
+ private JobDaoImpl castToJobDaoImpl(Job job) {
+ if (!(job instanceof JobDaoImpl)) {
+ throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
+ }
+ return (JobDaoImpl) job;
+ }
+
+ @Override
+ public Collection<Job> peek() {
+ return dataAccessService.getList(JobDaoImpl.class, null);
+ }
+
+ @Override
+ public Job peek(UUID jobId) {
+ return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
+ }
+
+ @Override
+ public void delete(UUID jobId) {
+ int updatedEntities;
+ Date now = new Date();
+
+ String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
+ " job.id = :id" +
+ " and job.status in(:pending, :stopped)" +
+ " and takenBy is null";
+
+ updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
+ session.createQuery(hqlUpdate)
+ .setTimestamp("now", now)
+ .setText("id", jobId.toString())
+ .setText("pending", Job.JobStatus.PENDING.toString())
+ .setText("stopped", Job.JobStatus.STOPPED.toString())
+ .executeUpdate());
+
+ if (updatedEntities == 0) {
+ final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
+
+ if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
+ logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
+ throw new OperationNotAllowedException("Service does not exist");
+ }
+
+ if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
+ logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
+ ", takenBy " + remoteDaoJob.getTakenBy());
+ throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
+ }
+
+ throw new OperationNotAllowedException("Service deletion failed");
+ }
+ }
+}