aboutsummaryrefslogtreecommitdiffstats
path: root/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
diff options
context:
space:
mode:
authorEinat Vinouze <einat.vinouze@intl.att.com>2019-07-16 17:17:36 +0300
committerIttay Stern <ittay.stern@att.com>2019-07-30 06:01:44 +0300
commite601bbdc43bae9a08e2e10c5139a6f76b47860d7 (patch)
tree1913f0b369ead3f2ea5557e5649d8281eca9871c /vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
parent76c6ee4a697617ec4cdee2f3b48bc83136c858c5 (diff)
Implant vid-app-common org.onap.vid.job (main and test)
Issue-ID: VID-378 Change-Id: I41b0bdc2c4e3635f3f3319b1cd63cefc61912dfc Signed-off-by: Einat Vinouze <einat.vinouze@intl.att.com> Signed-off-by: Ittay Stern <ittay.stern@att.com>
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java')
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java144
1 files changed, 102 insertions, 42 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
index 59ca43743..74a729494 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ package org.onap.vid.job.impl;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.SessionFactory;
+import org.jetbrains.annotations.NotNull;
import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
import org.onap.portalsdk.core.service.DataAccessService;
import org.onap.portalsdk.core.util.SystemProperties;
@@ -30,18 +31,19 @@ import org.onap.vid.exceptions.OperationNotAllowedException;
import org.onap.vid.job.Job;
import org.onap.vid.job.JobsBrokerService;
import org.onap.vid.properties.VidProperties;
+import org.onap.vid.services.VersionService;
import org.onap.vid.utils.DaoUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
-import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
+import java.util.stream.Collectors;
-import static org.onap.vid.job.Job.JobStatus.CREATING;
+import static org.onap.vid.job.Job.JobStatus.*;
@Service
public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
@@ -54,15 +56,20 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
private int maxOpenedInstantiationRequestsToMso;
private int pollingIntervalSeconds;
+ private final VersionService versionService;
+
@Autowired
- public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
+ public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService,
+ SessionFactory sessionFactory,
@Value("0") int maxOpenedInstantiationRequestsToMso,
- @Value("10") int pollingIntervalSeconds) {
+ @Value("10") int pollingIntervalSeconds,
+ VersionService versionService) {
// tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
this.dataAccessService = dataAccessService;
this.sessionFactory = sessionFactory;
this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
this.pollingIntervalSeconds = pollingIntervalSeconds;
+ this.versionService = versionService;
}
@PostConstruct
@@ -78,6 +85,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
@Override
public UUID add(Job job) {
final JobDaoImpl jobDao = castToJobDaoImpl(job);
+ jobDao.setBuild(versionService.retrieveBuildNumber());
dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
return job.getUuid();
}
@@ -120,7 +128,11 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
}
private java.sql.Timestamp nowMinusInterval() {
- return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
+ return nowMinusInterval(pollingIntervalSeconds);
+ }
+
+ private java.sql.Timestamp nowMinusInterval(long seconds) {
+ return Timestamp.valueOf(LocalDateTime.now().minusSeconds(seconds));
}
private String selectQueryByJobStatus(Job.JobStatus topic){
@@ -130,17 +142,23 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
"select * from VID_JOB" +
" where" +
// select only non-deleted in-progress jobs
- " JOB_STATUS = '" + topic + "'" +
- " and TAKEN_BY is null" +
- " and DELETED_AT is null" +
+ filterByStatusNotTakenNotDeleted(topic) +
// give some breath, don't select jos that were recently reached
- intervalCondition +
+ intervalCondition +
// take the oldest handled one
" order by MODIFIED_DATE ASC" +
// select only one result
" limit 1";
}
+ @NotNull
+ private String filterByStatusNotTakenNotDeleted(Job.JobStatus topic) {
+ return " JOB_STATUS = '" + topic + "'" +
+ " and TAKEN_BY is null" +
+ " and DELETED_AT is null "+
+ " and BUILD = '"+ versionService.retrieveBuildNumber() +"'";
+ }
+
private String sqlQueryForTopic(Job.JobStatus topic) {
switch (topic) {
case IN_PROGRESS:
@@ -148,44 +166,73 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
case CREATING:
return selectQueryByJobStatus(topic);
case PENDING:
- return "" +
- // select only pending jobs
- "select vid_job.* from VID_JOB " +
- // select users have in_progress jobs
- "left join \n" +
- " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
- "group by user_id) users_have_any_in_progress_job_tbl\n" +
- "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
- "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
- // job is not deleted
- " AND DELETED_AT is null and (\n" +
- // limit in-progress to some amount
- "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
- "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
- // don't take jobs from templates that already in-progress/failed
- "and TEMPLATE_Id not in \n" +
- "(select TEMPLATE_Id from vid_job where" +
- " TEMPLATE_Id IS NOT NULL and("+
- " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
- " or JOB_STATUS='IN_PROGRESS'" +
- " or TAKEN_BY IS NOT NULL))" + " \n " +
- // prefer older jobs, but the earlier in each bulk
- "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
- // select only one result
- "limit 1";
+ return selectQueryForPendingJob();
+ case PENDING_RESOURCE:
+ return selectQueryForPendingResource();
default:
throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
}
}
+ @NotNull
+ private String selectQueryForPendingJob() {
+ return "" +
+ // select only pending jobs
+ "select vid_job.* from VID_JOB " +
+ // select users have in_progress jobs
+ "left join \n" +
+ " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
+ "group by user_id) users_have_any_in_progress_job_tbl\n" +
+ "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
+ "where "+filterByStatusNotTakenNotDeleted(Job.JobStatus.PENDING)+" and (\n" +
+ // limit in-progress to some amount
+ "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
+ "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
+ // don't take jobs from templates that already in-progress/failed
+ "and TEMPLATE_Id not in \n" +
+ "(select TEMPLATE_Id from vid_job where" +
+ " TEMPLATE_Id IS NOT NULL and("+
+ " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
+ " or JOB_STATUS='IN_PROGRESS'" +
+ " or TAKEN_BY IS NOT NULL))" + " \n " +
+ // prefer older jobs, but the earlier in each bulk
+ "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
+ // select only one result
+ "limit 1";
+ }
- private byte[] getUuidAsByteArray(UUID owner) {
- ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
- bb.putLong(owner.getMostSignificantBits());
- bb.putLong(owner.getLeastSignificantBits());
- return bb.array();
+ @NotNull
+ private String selectQueryForPendingResource() {
+ return "select * from vid_job as JOB left join \n" +
+ //count jobs
+ "(select template_id,count(*) as in_progress_count from vid_job \n" +
+ String.format("where (\n"+
+ " (\n"+
+ //with job_status IN_PROGRESS or RESOURCE_IN_PROGRESS
+ " (job_status in ('%s','%s') and DELETED_AT is NULL) \n",IN_PROGRESS, RESOURCE_IN_PROGRESS)+
+ //or that with job_status PENDING_RESOURCE that are taken
+ String.format(" or (JOB_STATUS='%s' and TAKEN_BY IS NOT NULL)\n )\n", PENDING_RESOURCE) +
+ //with template ID and are not deleted
+ " and TEMPLATE_ID IS NOT NULL and DELETED_AT is NULL\n)\n" +
+ //join them to vid_job by template_id
+ "group by template_id)\n"+
+ "as COUNTER on COUNTER.template_id=JOB.template_id \n" +
+
+ "where (\n"+
+ //select jobs with job_status PENDING_RESOURCE that are nit taken and not deleted
+ filterByStatusNotTakenNotDeleted(PENDING_RESOURCE) + "\n" +
+ //that have no count in the counter (no other in progress job with same templateId)
+ " and in_progress_count is NULL \n" +
+ //and that have valid templateId
+ " and JOB.template_id is not NULL \n"+
+ ")\n" +
+ //INDEX_IN_BULK is for order them inside same templateId,
+ //template_id - for order between different templateId (just to be deterministic)
+ "order by INDEX_IN_BULK,JOB.template_id \n" +
+ "limit 1;";
}
+
@Override
public void pushBack(Job job) {
final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
@@ -253,7 +300,7 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
throw new OperationNotAllowedException("Service does not exist");
}
- if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
+ if ((remoteDaoJob.getStatus() != Job.JobStatus.PENDING) && (remoteDaoJob.getStatus() != Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
", takenBy " + remoteDaoJob.getTakenBy());
throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
@@ -290,4 +337,17 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
return updatedEntities != 0;
}
+
+ private static String sqlListOfFinalStatus =
+ String.format("(%s)",
+ FINAL_STATUS.stream().
+ map(x->String.format("'%s'",x)).
+ collect(Collectors.joining(","))
+ );
+
+ @Override
+ public void deleteOldFinalJobs(long secondsAgo) {
+ String select = String.format(" MODIFIED_DATE <= '%s' and JOB_STATUS in %s", nowMinusInterval(secondsAgo), sqlListOfFinalStatus);
+ dataAccessService.deleteDomainObjects(JobDaoImpl.class, select, null);
+ }
}