aboutsummaryrefslogtreecommitdiffstats
path: root/vid-app-common/src/main/java/org/onap/vid/job/impl
diff options
context:
space:
mode:
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job/impl')
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java55
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java40
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java54
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java4
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java84
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java35
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java75
7 files changed, 273 insertions, 74 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java
index 59f12f4c..1050ab93 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java
@@ -4,16 +4,11 @@ import com.google.common.collect.ImmutableMap;
import org.onap.vid.job.Job;
import org.onap.vid.job.JobAdapter;
import org.onap.vid.job.JobType;
-import org.onap.vid.model.JobBulk;
import org.onap.vid.model.JobModel;
import org.springframework.stereotype.Component;
-import javax.ws.rs.BadRequestException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
@Component
public class JobAdapterImpl implements JobAdapter {
@@ -28,44 +23,32 @@ public class JobAdapterImpl implements JobAdapter {
}
@Override
- public JobBulk toModelBulk(List<Job> jobList) {
- return new JobBulk(jobList.stream().map(this::toModel).collect(Collectors.toList()));
- }
-
- @Override
- public Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk) {
- JobDaoImpl job = createJob(jobType, templateId, indexInBulk, ImmutableMap.of(
- "request", request,
- "userId", userId));
- job.setUserId(userId);
+ public Job createServiceInstantiationJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, String optimisticUniqueServiceInstanceName, Integer indexInBulk){
+ JobDaoImpl job = createJob(jobType, Job.JobStatus.PENDING , userId);
+ job.setSharedData(new JobSharedData(job.getUuid(), userId, request));
+ job.setTypeAndData(jobType, ImmutableMap.of(
+ "optimisticUniqueServiceInstanceName", optimisticUniqueServiceInstanceName
+ ));
+ job.setTemplateId(templateId);
+ job.setIndexInBulk(indexInBulk);
return job;
}
@Override
- public List<Job> createBulkOfJobs(Map<String, Object> bulkRequest) {
- int count;
- JobType jobType;
-
- try {
- count = (Integer) bulkRequest.get("count");
- jobType = JobType.valueOf((String) bulkRequest.get("type"));
- } catch (Exception exception) {
- throw new BadRequestException(exception);
- }
- List<Job> jobList = new ArrayList<>(count + 1);
- UUID templateId = UUID.randomUUID();
- for (int i = 0; i < count; i++) {
- jobList.add(createJob(jobType, templateId, i, bulkRequest));
- }
- return jobList;
+ public Job createChildJob(JobType jobType, Job.JobStatus jobStatus, AsyncJobRequest request, JobSharedData parentSharedData, Map<String, Object> jobData) {
+ JobDaoImpl job = createJob(jobType, jobStatus , parentSharedData.getUserId());
+ job.setSharedData(new JobSharedData(job.getUuid(), request, parentSharedData));
+ job.setTypeAndData(jobType, jobData);
+ return job;
}
- private JobDaoImpl createJob(JobType jobType, UUID templateId, Integer indexInBulk, Map<String, Object> data) {
+ protected JobDaoImpl createJob(JobType jobType, Job.JobStatus jobStatus, String userId) {
JobDaoImpl job = new JobDaoImpl();
- job.setStatus(Job.JobStatus.PENDING);
- job.setTypeAndData(jobType, data);
- job.setTemplateId(templateId);
- job.setIndexInBulk(indexInBulk);
+ job.setTypeAndData(jobType, null);
+ job.setStatus(jobStatus);
+ job.setUuid(UUID.randomUUID());
+ job.setUserId(userId);
return job;
}
+
}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java
index 1ff1296c..b301a2fb 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java
@@ -3,19 +3,31 @@ package org.onap.vid.job.impl;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.MoreObjects;
+import org.hibernate.annotations.DynamicUpdate;
+import org.hibernate.annotations.SelectBeforeUpdate;
import org.hibernate.annotations.Type;
import org.onap.vid.exceptions.GenericUncheckedException;
import org.onap.vid.job.Job;
+import org.onap.vid.job.JobException;
import org.onap.vid.job.JobType;
import org.onap.vid.model.VidBaseEntity;
import javax.persistence.*;
import java.io.IOException;
-import java.util.*;
-
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+/*
+ The following 2 annotations let hibernate to update only fields that actually have been changed.
+ DynamicUpdate tell hibernate to update only dirty fields.
+ SelectBeforeUpdate is needed since during update the entity is detached (get and update are in different sessions)
+*/
+@DynamicUpdate()
+@SelectBeforeUpdate()
@Entity
@Table(name = "vid_job")
public class JobDaoImpl extends VidBaseEntity implements Job {
@@ -23,7 +35,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job {
private static ObjectMapper objectMapper = new ObjectMapper();
private Job.JobStatus status;
private JobType type;
- private Map<JobType, Map<String, Object>> data = new TreeMap<>();
+ private JobData data = new JobData();
private UUID templateId;
private UUID uuid;
private String takenBy;
@@ -83,16 +95,25 @@ public class JobDaoImpl extends VidBaseEntity implements Job {
public void setDataRaw(String data) {
try {
- this.data = objectMapper.readValue(data, new TypeReference<Map<JobType, Map<String, Object>>>() {
- });
+ this.data = objectMapper.readValue(data, JobData.class);
} catch (IOException e) {
- throw new GenericUncheckedException(e);
+ throw new JobException("Error parsing job's data", uuid, e);
}
}
@Transient
public Map<String, Object> getData() {
- return data.get(getType());
+ return data.getCommandData().get(getType());
+ }
+
+ public void setSharedData(JobSharedData sharedData) {
+ this.data.setSharedData(sharedData);
+ }
+
+ @Override
+ @Transient
+ public JobSharedData getSharedData() {
+ return this.data.getSharedData();
}
@Override
@@ -100,7 +121,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job {
// *add* the data to map,
// then change state to given type
this.type = jobType;
- this.data.put(jobType, data);
+ this.data.getCommandData().put(jobType, data);
}
@Column(name = "TAKEN_BY")
@@ -123,6 +144,7 @@ public class JobDaoImpl extends VidBaseEntity implements Job {
this.templateId = templateId;
}
+ @Override
@Column(name="INDEX_IN_BULK")
public Integer getIndexInBulk() {
return indexInBulk;
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java
new file mode 100644
index 00000000..0b7a6b7c
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobData.java
@@ -0,0 +1,54 @@
+package org.onap.vid.job.impl;
+
+import org.onap.vid.job.JobType;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+public class JobData {
+
+ private TreeMap<JobType, Map<String, Object>> commandData;
+ private JobSharedData sharedData;
+
+ public JobData() {
+ commandData = new TreeMap<>();
+ sharedData = new JobSharedData();
+ }
+
+ public JobData(TreeMap<JobType, Map<String, Object>> commandData, JobSharedData sharedData) {
+ this.commandData = commandData;
+ this.sharedData = sharedData;
+ }
+
+ public TreeMap<JobType, Map<String, Object>> getCommandData() {
+ return commandData;
+ }
+
+ public void setCommandData(TreeMap<JobType, Map<String, Object>> commandData) {
+ this.commandData = commandData;
+ }
+
+ public JobSharedData getSharedData() {
+ return sharedData;
+ }
+
+ public void setSharedData(JobSharedData sharedData) {
+ this.sharedData = sharedData;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof JobData)) return false;
+ JobData jobData = (JobData) o;
+ return Objects.equals(getCommandData(), jobData.getCommandData()) &&
+ Objects.equals(getSharedData(), jobData.getSharedData());
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(getCommandData(), getSharedData());
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java
index a5070fbd..59b2f250 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java
@@ -1,12 +1,12 @@
package org.onap.vid.job.impl;
import com.google.common.collect.ImmutableMap;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
import org.onap.vid.exceptions.GenericUncheckedException;
import org.onap.vid.job.Job;
import org.onap.vid.job.JobsBrokerService;
import org.onap.vid.job.command.JobCommandFactory;
import org.onap.vid.properties.Features;
-import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@@ -46,7 +46,9 @@ public class JobSchedulerInitializer {
return;
}
scheduleJobWorker(Job.JobStatus.PENDING, 1);
+ scheduleJobWorker(Job.JobStatus.CREATING, 1);
scheduleJobWorker(Job.JobStatus.IN_PROGRESS, 1);
+ scheduleJobWorker(Job.JobStatus.RESOURCE_IN_PROGRESS, 1);
}
private void scheduleJobWorker(Job.JobStatus topic, int intervalInSeconds) {
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java
new file mode 100644
index 00000000..8f1e4573
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSharedData.java
@@ -0,0 +1,84 @@
+package org.onap.vid.job.impl;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.onap.vid.job.JobAdapter;
+
+import java.util.Objects;
+import java.util.UUID;
+
+public class JobSharedData {
+
+ protected UUID jobUuid;
+ protected String userId;
+ protected Class requestType;
+ protected UUID rootJobId;
+
+ @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, property="class")
+ protected JobAdapter.AsyncJobRequest request;
+
+ public JobSharedData() {
+ }
+
+ public JobSharedData(UUID jobUuid, String userId, JobAdapter.AsyncJobRequest request) {
+ this.jobUuid = jobUuid;
+ this.userId = userId;
+ this.requestType = request.getClass();
+ this.request = request;
+ this.rootJobId = jobUuid;
+ }
+
+ public JobSharedData(UUID jobUuid, JobAdapter.AsyncJobRequest request, JobSharedData parentData) {
+ this(jobUuid, parentData.getUserId(), request);
+ rootJobId = parentData.getRootJobId() != null ? parentData.getRootJobId() : parentData.getJobUuid();
+ }
+
+
+ public UUID getJobUuid() {
+ return jobUuid;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public Class getRequestType() {
+ return requestType;
+ }
+
+ public void setRequestType(Class requestType) {
+ this.requestType = requestType;
+ }
+
+ public JobAdapter.AsyncJobRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(JobAdapter.AsyncJobRequest request) {
+ this.request = request;
+ }
+
+ public UUID getRootJobId() {
+ return rootJobId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof JobSharedData)) return false;
+ JobSharedData that = (JobSharedData) o;
+ return Objects.equals(getJobUuid(), that.getJobUuid()) &&
+ Objects.equals(getUserId(), that.getUserId()) &&
+ Objects.equals(getRequestType(), that.getRequestType()) &&
+ Objects.equals(getRootJobId(), that.getRootJobId()) &&
+ Objects.equals(getRequest(), that.getRequest());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getJobUuid(), getUserId(), getRequestType(), getRootJobId(), getRequest());
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java
index aa94a2aa..36ec4314 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java
@@ -1,13 +1,11 @@
package org.onap.vid.job.impl;
import org.apache.commons.lang3.StringUtils;
-import org.onap.vid.job.Job;
-import org.onap.vid.job.JobCommand;
-import org.onap.vid.job.JobsBrokerService;
-import org.onap.vid.job.NextCommand;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.onap.vid.job.*;
import org.onap.vid.job.command.JobCommandFactory;
import org.onap.vid.properties.Features;
-import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
import org.quartz.JobExecutionContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
@@ -52,6 +50,8 @@ public class JobWorker extends QuartzJobBean {
return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
} catch (Exception e) {
LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
+ tryMutingJobFromException(e);
+
return Optional.empty();
}
}
@@ -72,9 +72,7 @@ public class JobWorker extends QuartzJobBean {
NextCommand nextCommand = executeCommandAndGetNext(job);
- Job nextJob = setNextCommandInJob(nextCommand, job);
-
- return nextJob;
+ return setNextCommandInJob(nextCommand, job);
}
private NextCommand executeCommandAndGetNext(Job job) {
@@ -83,7 +81,7 @@ public class JobWorker extends QuartzJobBean {
final JobCommand jobCommand = jobCommandFactory.toCommand(job);
nextCommand = jobCommand.call();
} catch (Exception e) {
- LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e);
+ LOGGER.error("error while executing job from queue: {}", e);
nextCommand = new NextCommand(FAILED);
}
@@ -114,6 +112,25 @@ public class JobWorker extends QuartzJobBean {
return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION);
}
+ private void tryMutingJobFromException(Exception e) {
+ // If there's JobException in the stack, read job uuid from
+ // the exception, and mute it in DB.
+ final int indexOfJobException =
+ ExceptionUtils.indexOfThrowable(e, JobException.class);
+
+ if (indexOfJobException >= 0) {
+ try {
+ final JobException jobException = (JobException) ExceptionUtils.getThrowableList(e).get(indexOfJobException);
+ LOGGER.info(EELFLoggerDelegate.debugLogger, "muting job: {} ({})", jobException.getJobUuid(), jobException.toString());
+ final boolean success = jobsBrokerService.mute(jobException.getJobUuid());
+ if (!success) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job {}", jobException.getJobUuid());
+ }
+ } catch (Exception e1) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to mute job: {}", e1, e1);
+ }
+ }
+ }
//used by quartz to inject JobsBrokerService into the job
//see JobSchedulerInitializer
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
index e286cc4a..e8747879 100644
--- a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
@@ -2,15 +2,15 @@ package org.onap.vid.job.impl;
import org.apache.commons.lang3.StringUtils;
import org.hibernate.SessionFactory;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.onap.portalsdk.core.service.DataAccessService;
+import org.onap.portalsdk.core.util.SystemProperties;
import org.onap.vid.exceptions.GenericUncheckedException;
import org.onap.vid.exceptions.OperationNotAllowedException;
import org.onap.vid.job.Job;
import org.onap.vid.job.JobsBrokerService;
import org.onap.vid.properties.VidProperties;
import org.onap.vid.utils.DaoUtils;
-import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
-import org.onap.portalsdk.core.service.DataAccessService;
-import org.onap.portalsdk.core.util.SystemProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -21,6 +21,8 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
+import static org.onap.vid.job.Job.JobStatus.CREATING;
+
@Service
public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
@@ -56,7 +58,6 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
@Override
public UUID add(Job job) {
final JobDaoImpl jobDao = castToJobDaoImpl(job);
- jobDao.setUuid(UUID.randomUUID());
dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
return job.getUuid();
}
@@ -102,23 +103,30 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
}
+ private String selectQueryByJobStatus(Job.JobStatus topic){
+
+ String intervalCondition = (topic==CREATING) ? "" : (" and MODIFIED_DATE <= '" + nowMinusInterval()+"'");
+ return "" +
+ "select * from VID_JOB" +
+ " where" +
+ // select only non-deleted in-progress jobs
+ " JOB_STATUS = '" + topic + "'" +
+ " and TAKEN_BY is null" +
+ " and DELETED_AT is null" +
+ // give some breath, don't select jos that were recently reached
+ intervalCondition +
+ // take the oldest handled one
+ " order by MODIFIED_DATE ASC" +
+ // select only one result
+ " limit 1";
+ }
+
private String sqlQueryForTopic(Job.JobStatus topic) {
switch (topic) {
case IN_PROGRESS:
- return "" +
- "select * from VID_JOB" +
- " where" +
- // select only non-deleted in-progress jobs
- " JOB_STATUS = 'IN_PROGRESS'" +
- " and TAKEN_BY is null" +
- " and DELETED_AT is null" +
- // give some breath, don't select jos that were recently reached
- " and MODIFIED_DATE <= '" + nowMinusInterval() +
- // take the oldest handled one
- "' order by MODIFIED_DATE ASC" +
- // select only one result
- " limit 1";
-
+ case RESOURCE_IN_PROGRESS:
+ case CREATING:
+ return selectQueryByJobStatus(topic);
case PENDING:
return "" +
// select only pending jobs
@@ -137,9 +145,10 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
// don't take jobs from templates that already in-progress/failed
"and TEMPLATE_Id not in \n" +
"(select TEMPLATE_Id from vid_job where" +
+ " TEMPLATE_Id IS NOT NULL and("+
" (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
" or JOB_STATUS='IN_PROGRESS'" +
- " or TAKEN_BY IS NOT NULL)" + " \n " +
+ " or TAKEN_BY IS NOT NULL))" + " \n " +
// prefer older jobs, but the earlier in each bulk
"order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
// select only one result
@@ -233,4 +242,32 @@ public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
throw new OperationNotAllowedException("Service deletion failed");
}
}
+
+ @Override
+ public boolean mute(UUID jobId) {
+ if (jobId == null) {
+ return false;
+ }
+
+ final String prefix = "DUMP";
+ int updatedEntities;
+
+ // Changing the topic (i.e. `job.status`) makes the job non-fetchable.
+ String hqlUpdate = "" +
+ "update JobDaoImpl job set" +
+ " job.status = concat('" + prefix + "_', job.status)," +
+ // empty `takenBy`, because some logics treat taken as in-progress
+ " takenBy = null" +
+ " where " +
+ " job.id = :id" +
+ // if prefix already on the topic -- no need to do it twice.
+ " and job.status NOT LIKE '" + prefix + "\\_%'";
+
+ updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
+ session.createQuery(hqlUpdate)
+ .setText("id", jobId.toString())
+ .executeUpdate());
+
+ return updatedEntities != 0;
+ }
}