aboutsummaryrefslogtreecommitdiffstats
path: root/vid-app-common/src/main/java/org/onap/vid/job
diff options
context:
space:
mode:
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job')
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/Job.java47
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java26
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java41
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/JobType.java31
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java21
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java25
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java26
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java52
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java110
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java43
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java25
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java140
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java72
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java192
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java76
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java135
-rw-r--r--vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java236
17 files changed, 1298 insertions, 0 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/Job.java b/vid-app-common/src/main/java/org/onap/vid/job/Job.java
new file mode 100644
index 000000000..77f348dc6
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/Job.java
@@ -0,0 +1,47 @@
+package org.onap.vid.job;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.Map;
+import java.util.UUID;
+
+public interface Job {
+
+ UUID getUuid();
+
+ void setUuid(UUID uuid);
+
+ JobStatus getStatus();
+
+ void setStatus(JobStatus status);
+
+ @JsonIgnore
+ Map<String, Object> getData();
+
+ void setTypeAndData(JobType jobType, Map<String, Object> data);
+
+ UUID getTemplateId();
+
+ void setTemplateId(UUID templateId);
+
+ void setIndexInBulk(Integer indexInBulk);
+
+ JobType getType();
+
+ enum JobStatus {
+ COMPLETED(true),
+ FAILED(true),
+ IN_PROGRESS(false),
+ PAUSE(false),
+ PENDING(false),
+ STOPPED(true);
+
+ private final Boolean finalStatus;
+ public Boolean isFinal(){return finalStatus;}
+
+ JobStatus(Boolean finalStatus)
+ {
+ this.finalStatus = finalStatus ;
+ }
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java b/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java
new file mode 100644
index 000000000..1701092b3
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java
@@ -0,0 +1,26 @@
+package org.onap.vid.job;
+
+import org.onap.vid.model.JobBulk;
+import org.onap.vid.model.JobModel;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * kind of factory for creating jobs and converting them to Job Model
+ */
+public interface JobAdapter {
+ JobModel toModel(Job job);
+
+ JobBulk toModelBulk(List<Job> jobList);
+
+ List<Job> createBulkOfJobs(Map<String, Object> bulkRequest);
+
+ Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk);
+
+ // Marks types that are an AsyncJob payload
+ public interface AsyncJobRequest {
+ }
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java
new file mode 100644
index 000000000..c32645cad
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java
@@ -0,0 +1,41 @@
+package org.onap.vid.job;
+
+import java.util.Map;
+import java.util.UUID;
+
+
+/**
+ * A callable instance, with serializable characteristics.
+ * Represents a step in a chain of steps, which eventualy
+ * resides into a packing Job.
+ */
+public interface JobCommand {
+
+ /**
+ * Initialize the command state
+ * @param jobUuid Parent job's uuid
+ * @param data An input to be set into the command. Each implementation may expect different keys in the map.
+ * @return Returns itself
+ */
+ default JobCommand init(UUID jobUuid, Map<String, Object> data) {
+ return this;
+ }
+
+ /**
+ * @return Returns the inner state of the command. This state, once passed into init(), should
+ * bring the command back to it's state.
+ */
+ Map<String, Object> getData();
+
+ /**
+ * Execute the command represented by this instance. Assumes the instance is already init().
+ * @return A NextCommand containing the next command in chain of commands, or null if chain
+ * should be terminated. Might return itself (packed in a NextCommand).
+ */
+ NextCommand call();
+
+ default JobType getType() {
+ return JobType.jobTypeOf(this.getClass());
+ }
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobType.java b/vid-app-common/src/main/java/org/onap/vid/job/JobType.java
new file mode 100644
index 000000000..9846d2775
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/JobType.java
@@ -0,0 +1,31 @@
+package org.onap.vid.job;
+
+import org.onap.vid.job.command.*;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public enum JobType {
+
+ HttpCall(HttpCallCommand.class),
+ AggregateState(AggregateStateCommand.class),
+ ServiceInstantiation(ServiceInstantiationCommand.class),
+ InProgressStatus(InProgressStatusCommand.class),
+ NoOp(NoOpCommand.class);
+
+ private static final Map<Class, JobType> REVERSE_MAP = Stream.of(values()).collect(Collectors.toMap(t -> t.getCommandClass(), t -> t));
+
+ private final Class commandClass;
+
+ <T extends JobCommand> JobType(Class<T> commandClass) {
+ this.commandClass = commandClass;
+ }
+
+ public Class getCommandClass() {
+ return commandClass;
+ }
+ static JobType jobTypeOf(Class commandClass) {
+ return REVERSE_MAP.get(commandClass);
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java b/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java
new file mode 100644
index 000000000..856f50b2e
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java
@@ -0,0 +1,21 @@
+package org.onap.vid.job;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.UUID;
+
+public interface JobsBrokerService {
+
+ UUID add(Job job);
+
+ Optional<Job> pull(Job.JobStatus topic, String ownerId);
+
+ void pushBack(Job job);
+
+ Collection<Job> peek();
+
+ Job peek(UUID jobId);
+
+ void delete(UUID jobId);
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java
new file mode 100644
index 000000000..55f375bb7
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java
@@ -0,0 +1,25 @@
+package org.onap.vid.job;
+
+public class NextCommand {
+ private final Job.JobStatus status;
+ private final JobCommand command;
+
+ public NextCommand(Job.JobStatus nextStatus, JobCommand nextCommand) {
+ this.status = nextStatus;
+ this.command = nextCommand;
+ }
+
+ public NextCommand(Job.JobStatus nextStatus) {
+ this.status = nextStatus;
+ this.command = null;
+ }
+
+ public Job.JobStatus getStatus() {
+ return status;
+ }
+
+ public JobCommand getCommand() {
+ return command;
+ }
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java
new file mode 100644
index 000000000..65aadf3be
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java
@@ -0,0 +1,26 @@
+package org.onap.vid.job.command;
+
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.NextCommand;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.Map;
+
+@Component
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class AggregateStateCommand implements JobCommand {
+
+ @Override
+ public NextCommand call() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> getData() {
+ return Collections.emptyMap();
+ }
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java
new file mode 100644
index 000000000..5951d7c83
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java
@@ -0,0 +1,52 @@
+package org.onap.vid.job.command;
+
+import com.google.common.collect.ImmutableMap;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.NextCommand;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.UUID;
+
+
+@Component
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class HttpCallCommand implements JobCommand {
+ private String url;
+ private UUID uuid;
+
+ public HttpCallCommand() {
+ }
+
+ public HttpCallCommand(String url, UUID uuid) {
+ init(url, uuid);
+ }
+
+ @Override
+ public NextCommand call() {
+ final Response response = ClientBuilder.newClient().target(url).request().post(Entity.text(uuid.toString()));
+ return new NextCommand(Job.JobStatus.COMPLETED);
+ }
+
+ @Override
+ public HttpCallCommand init(UUID jobUuid, Map<String, Object> data) {
+ return init((String) data.get("url"), jobUuid);
+ }
+
+ private HttpCallCommand init(String url, UUID jobUuid) {
+ this.url = url;
+ this.uuid = jobUuid;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getData() {
+ return ImmutableMap.of("url", url);
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java
new file mode 100644
index 000000000..64c782c00
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java
@@ -0,0 +1,110 @@
+package org.onap.vid.job.command;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.onap.vid.job.Job.JobStatus;
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.NextCommand;
+import org.onap.vid.mso.RestMsoImplementation;
+import org.onap.vid.mso.RestObject;
+import org.onap.vid.mso.rest.AsyncRequestStatus;
+import org.onap.vid.services.AsyncInstantiationBusinessLogic;
+import org.onap.vid.services.AuditService;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.UUID;
+
+
+@Component
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class InProgressStatusCommand implements JobCommand {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(InProgressStatusCommand.class);
+
+ @Inject
+ private AsyncInstantiationBusinessLogic asyncInstantiationBL;
+
+ @Inject
+ private RestMsoImplementation restMso;
+
+ @Inject
+ private AuditService auditService;
+
+ private String requestId;
+
+ private UUID jobUuid;
+
+ public InProgressStatusCommand() {
+ }
+
+ InProgressStatusCommand(UUID jobUuid, String requestId) {
+ init(jobUuid, requestId);
+ }
+
+ @Override
+ public NextCommand call() {
+
+ try {
+ String path = asyncInstantiationBL.getOrchestrationRequestsPath()+"/"+requestId;
+ RestObject<AsyncRequestStatus> msoResponse = restMso.GetForObject("", path, AsyncRequestStatus.class);
+ JobStatus jobStatus;
+ if (msoResponse.getStatusCode() >= 400 || msoResponse.get() == null) {
+ auditService.setFailedAuditStatusFromMso(jobUuid, requestId, msoResponse.getStatusCode(), msoResponse.getRaw());
+ LOGGER.error(EELFLoggerDelegate.errorLogger,
+ "Failed to get orchestration status for {}. Status code: {}, Body: {}",
+ requestId, msoResponse.getStatusCode(), msoResponse.getRaw());
+ return new NextCommand(JobStatus.IN_PROGRESS, this);
+ }
+ else {
+ jobStatus = asyncInstantiationBL.calcStatus(msoResponse.get());
+ }
+
+ asyncInstantiationBL.auditMsoStatus(jobUuid,msoResponse.get().request);
+
+
+ if (jobStatus == JobStatus.FAILED) {
+ asyncInstantiationBL.handleFailedInstantiation(jobUuid);
+ }
+ else {
+ asyncInstantiationBL.updateServiceInfoAndAuditStatus(jobUuid, jobStatus);
+ }
+ //in case of JobStatus.PAUSE we leave the job itself as IN_PROGRESS, for keep tracking job progress
+ if (jobStatus == JobStatus.PAUSE) {
+ return new NextCommand(JobStatus.IN_PROGRESS, this);
+ }
+ return new NextCommand(jobStatus, this);
+ } catch (javax.ws.rs.ProcessingException e) {
+ // Retry when we can't connect MSO during getStatus
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "Cannot get orchestration status for {}, will retry: {}", requestId, e, e);
+ return new NextCommand(JobStatus.IN_PROGRESS, this);
+ } catch (RuntimeException e) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "Cannot get orchestration status for {}, stopping: {}", requestId, e, e);
+ return new NextCommand(JobStatus.STOPPED, this);
+ }
+ }
+
+ @Override
+ public InProgressStatusCommand init(UUID jobUuid, Map<String, Object> data) {
+ return init(jobUuid, (String) data.get("requestId"));
+ }
+
+ private InProgressStatusCommand init(UUID jobUuid, String requestId) {
+ this.requestId = requestId;
+ this.jobUuid = jobUuid;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getData() {
+ return ImmutableMap.of("requestId", requestId);
+ }
+
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java b/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java
new file mode 100644
index 000000000..1e613c58b
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java
@@ -0,0 +1,43 @@
+package org.onap.vid.job.command;
+
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobCommand;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.function.Function;
+
+@Component
+public class JobCommandFactory {
+
+ final Function<Class<? extends JobCommand>, JobCommand> jobFactory;
+
+ @Inject
+ public JobCommandFactory(ApplicationContext applicationContext) {
+ this.jobFactory = (jobType -> {
+ final Object commandBean = applicationContext.getBean(jobType);
+
+ if (!(commandBean instanceof JobCommand)) {
+ throw new GenericUncheckedException(commandBean.getClass() + " is not a JobCommand");
+ }
+
+ return (JobCommand) commandBean;
+ });
+ }
+
+ public JobCommandFactory(Function<Class<? extends JobCommand>, JobCommand> jobFactory) {
+ this.jobFactory = jobFactory;
+ }
+
+ public JobCommand toCommand(Job job) {
+
+ final JobCommand command = jobFactory.apply(job.getType().getCommandClass());
+ command.init(job.getUuid(), job.getData());
+
+ return command;
+ }
+
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java
new file mode 100644
index 000000000..37636fcaa
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java
@@ -0,0 +1,25 @@
+package org.onap.vid.job.command;
+
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.NextCommand;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.Map;
+
+@Component
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class NoOpCommand implements JobCommand {
+
+ @Override
+ public NextCommand call() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> getData() {
+ return Collections.emptyMap();
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java
new file mode 100644
index 000000000..6afacf23e
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java
@@ -0,0 +1,140 @@
+package org.onap.vid.job.command;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.onap.vid.aai.exceptions.InvalidAAIResponseException;
+import org.onap.vid.changeManagement.RequestDetailsWrapper;
+import org.onap.vid.exceptions.MaxRetriesException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.NextCommand;
+import org.onap.vid.model.RequestReferencesContainer;
+import org.onap.vid.model.serviceInstantiation.ServiceInstantiation;
+import org.onap.vid.mso.RestMsoImplementation;
+import org.onap.vid.mso.RestObject;
+import org.onap.vid.mso.model.ServiceInstantiationRequestDetails;
+import org.onap.vid.services.AsyncInstantiationBusinessLogic;
+import org.onap.vid.services.AuditService;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.UUID;
+
+
+@Component
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class ServiceInstantiationCommand implements JobCommand {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(ServiceInstantiationCommand.class);
+
+ @Inject
+ private AsyncInstantiationBusinessLogic asyncInstantiationBL;
+
+ @Inject
+ private AuditService auditService;
+
+ @Inject
+ private RestMsoImplementation restMso;
+
+ private UUID uuid;
+ private ServiceInstantiation serviceInstantiationRequest;
+ private String userId;
+
+ public ServiceInstantiationCommand() {
+ }
+
+ public ServiceInstantiationCommand(UUID uuid, ServiceInstantiation serviceInstantiationRequest, String userId) {
+ init(uuid, serviceInstantiationRequest, userId);
+ }
+
+ @Override
+ public NextCommand call() {
+ RequestDetailsWrapper<ServiceInstantiationRequestDetails> requestDetailsWrapper ;
+ try {
+ requestDetailsWrapper = asyncInstantiationBL.generateServiceInstantiationRequest(
+ uuid, serviceInstantiationRequest, userId
+ );
+ }
+
+ //Aai return bad response while checking names uniqueness
+ catch (InvalidAAIResponseException exception) {
+ LOGGER.error("Failed to check name uniqueness in AAI. VID will try again later", exception);
+ //put the job in_progress so we will keep trying to check name uniqueness in AAI
+ //And then send the request to MSO
+ return new NextCommand(Job.JobStatus.IN_PROGRESS, this);
+ }
+
+ //Vid reached to max retries while trying to find unique name in AAI
+ catch (MaxRetriesException exception) {
+ LOGGER.error("Failed to find unused name in AAI. Set the job to FAILED ", exception);
+ return handleCommandFailed();
+ }
+
+ String path = asyncInstantiationBL.getServiceInstantiationPath(serviceInstantiationRequest);
+
+ RestObject<RequestReferencesContainer> msoResponse = restMso.PostForObject(requestDetailsWrapper, "",
+ path, RequestReferencesContainer.class);
+
+ if (msoResponse.getStatusCode() >= 200 && msoResponse.getStatusCode() < 400) {
+ final Job.JobStatus jobStatus = Job.JobStatus.IN_PROGRESS;
+ final String requestId = msoResponse.get().getRequestReferences().getRequestId();
+ final String instanceId = msoResponse.get().getRequestReferences().getInstanceId();
+ asyncInstantiationBL.auditVidStatus(uuid, jobStatus);
+ setInitialRequestAuditStatusFromMso(requestId);
+ asyncInstantiationBL.updateServiceInfo(uuid, x-> {
+ x.setJobStatus(jobStatus);
+ x.setServiceInstanceId(instanceId);
+ });
+
+ return new NextCommand(jobStatus, new InProgressStatusCommand(uuid, requestId));
+ } else {
+ auditService.setFailedAuditStatusFromMso(uuid,null, msoResponse.getStatusCode(),msoResponse.getRaw());
+ return handleCommandFailed();
+ }
+
+ }
+
+ private void setInitialRequestAuditStatusFromMso(String requestId){
+ final String initialMsoRequestStatus = "REQUESTED";
+ asyncInstantiationBL.auditMsoStatus(uuid,initialMsoRequestStatus,requestId,null);
+ }
+
+ protected NextCommand handleCommandFailed() {
+ asyncInstantiationBL.handleFailedInstantiation(uuid);
+ return new NextCommand(Job.JobStatus.FAILED);
+ }
+
+ @Override
+ public ServiceInstantiationCommand init(UUID jobUuid, Map<String, Object> data) {
+ final Object request = data.get("request");
+
+ return init(
+ jobUuid,
+ OBJECT_MAPPER.convertValue(request, ServiceInstantiation.class),
+ (String) data.get("userId")
+ );
+ }
+
+ private ServiceInstantiationCommand init(UUID jobUuid, ServiceInstantiation serviceInstantiationRequest, String userId) {
+ this.uuid = jobUuid;
+ this.serviceInstantiationRequest = serviceInstantiationRequest;
+ this.userId = userId;
+
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getData() {
+ return ImmutableMap.of(
+ "uuid", uuid,
+ "request", serviceInstantiationRequest,
+ "userId", userId
+ );
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java
new file mode 100644
index 000000000..77e1dd2cf
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java
@@ -0,0 +1,72 @@
+package org.onap.vid.job.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobAdapter;
+import org.onap.vid.job.JobType;
+import org.onap.vid.model.JobBulk;
+import org.onap.vid.model.JobModel;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.BadRequestException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Component
+public class JobAdapterImpl implements JobAdapter {
+
+ @Override
+ public JobModel toModel(Job job) {
+ JobModel jobModel = new JobModel();
+ jobModel.setUuid(job.getUuid());
+ jobModel.setStatus(job.getStatus());
+ jobModel.setTemplateId(job.getTemplateId());
+ return jobModel;
+ }
+
+ @Override
+ public JobBulk toModelBulk(List<Job> jobList) {
+ return new JobBulk(jobList.stream().map(this::toModel).collect(Collectors.toList()));
+ }
+
+ @Override
+ public Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk){
+ JobDaoImpl job = new JobDaoImpl();
+ job.setStatus(Job.JobStatus.PENDING);
+ job.setTypeAndData(jobType, ImmutableMap.of(
+ "request", request,
+ "userId", userId));
+ job.setTemplateId(templateId);
+ job.setIndexInBulk(indexInBulk);
+ job.setUserId(userId);
+ return job;
+ }
+
+ @Override
+ public List<Job> createBulkOfJobs(Map<String, Object> bulkRequest) {
+ int count;
+ JobType jobType;
+
+ try {
+ count = (Integer) bulkRequest.get("count");
+ jobType = JobType.valueOf((String) bulkRequest.get("type"));
+ } catch (Exception exception) {
+ throw new BadRequestException(exception);
+ }
+ List<Job> jobList = new ArrayList<>(count + 1);
+ UUID templateId = UUID.randomUUID();
+ for (int i = 0; i < count; i++) {
+ Job child = new JobDaoImpl();
+ child.setTypeAndData(jobType, bulkRequest);
+ child.setStatus(Job.JobStatus.PENDING);
+ child.setTemplateId(templateId);
+ child.setIndexInBulk(i);
+ jobList.add(child);
+ }
+ return jobList;
+ }
+
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java
new file mode 100644
index 000000000..1ff1296c7
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java
@@ -0,0 +1,192 @@
+package org.onap.vid.job.impl;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.MoreObjects;
+import org.hibernate.annotations.Type;
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobType;
+import org.onap.vid.model.VidBaseEntity;
+
+import javax.persistence.*;
+import java.io.IOException;
+import java.util.*;
+
+@Entity
+@Table(name = "vid_job")
+public class JobDaoImpl extends VidBaseEntity implements Job {
+
+ private static ObjectMapper objectMapper = new ObjectMapper();
+ private Job.JobStatus status;
+ private JobType type;
+ private Map<JobType, Map<String, Object>> data = new TreeMap<>();
+ private UUID templateId;
+ private UUID uuid;
+ private String takenBy;
+ private String userId;
+ private Integer age = 0;
+ private Integer indexInBulk = 0;
+ private Date deletedAt;
+
+ @Id
+ @Column(name = "JOB_ID", columnDefinition = "CHAR(36)")
+ @Type(type="org.hibernate.type.UUIDCharType")
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(UUID uuid) {
+ this.uuid = uuid;
+ }
+
+ //we use uuid instead id. So making id Transient
+ @Override
+ @Transient
+ @JsonIgnore
+ public Long getId() {
+ return this.getUuid().getLeastSignificantBits();
+ }
+
+ @Column(name = "JOB_STATUS")
+ @Enumerated(EnumType.STRING)
+ public Job.JobStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(Job.JobStatus status) {
+ this.status = status;
+ }
+
+ @Enumerated(EnumType.STRING)
+ @Column(name = "JOB_TYPE")
+ public JobType getType() {
+ return type;
+ }
+
+ public void setType(JobType type) {
+ this.type = type;
+ }
+
+ //the columnDefinition is relevant only for UT
+ @Column(name = "JOB_DATA", columnDefinition = "VARCHAR(30000)")
+ public String getDataRaw() {
+ try {
+ return objectMapper.writeValueAsString(data);
+ } catch (JsonProcessingException e) {
+ throw new GenericUncheckedException(e);
+ }
+ }
+
+ public void setDataRaw(String data) {
+ try {
+ this.data = objectMapper.readValue(data, new TypeReference<Map<JobType, Map<String, Object>>>() {
+ });
+ } catch (IOException e) {
+ throw new GenericUncheckedException(e);
+ }
+ }
+
+ @Transient
+ public Map<String, Object> getData() {
+ return data.get(getType());
+ }
+
+ @Override
+ public void setTypeAndData(JobType jobType, Map<String, Object> data) {
+ // *add* the data to map,
+ // then change state to given type
+ this.type = jobType;
+ this.data.put(jobType, data);
+ }
+
+ @Column(name = "TAKEN_BY")
+ public String getTakenBy() {
+ return takenBy;
+ }
+
+ public void setTakenBy(String takenBy) {
+ this.takenBy = takenBy;
+ }
+
+ @Type(type="org.hibernate.type.UUIDCharType")
+ @Column(name = "TEMPLATE_ID", columnDefinition = "CHAR(36)")
+ public UUID getTemplateId() {
+ return templateId;
+ }
+
+ @Override
+ public void setTemplateId(UUID templateId) {
+ this.templateId = templateId;
+ }
+
+ @Column(name="INDEX_IN_BULK")
+ public Integer getIndexInBulk() {
+ return indexInBulk;
+ }
+
+ @Override
+ public void setIndexInBulk(Integer indexInBulk) {
+ this.indexInBulk = indexInBulk;
+ }
+
+ @Column(name="USER_ID")
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ @Column(name="AGE")
+ public Integer getAge() {
+ return age;
+ }
+
+ public void setAge(Integer age) {
+ this.age = age;
+ }
+
+ @Column(name="DELETED_AT")
+ public Date getDeletedAt() {
+ return deletedAt;
+ }
+
+ public void setDeletedAt(Date deletedAt) {
+ this.deletedAt = deletedAt;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof JobDaoImpl)) return false;
+ JobDaoImpl daoJob = (JobDaoImpl) o;
+ return Objects.equals(getUuid(), daoJob.getUuid());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getUuid());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("status", status)
+ .add("type", type)
+ .add("templateId", templateId)
+ .add("uuid", uuid)
+ .add("takenBy", takenBy)
+ .add("userId", userId)
+ .add("age", age)
+ .add("created", created)
+ .add("modified", modified)
+ .add("deletedAt", deletedAt)
+ .add("data", data)
+ .toString();
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java
new file mode 100644
index 000000000..a5070fbdf
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java
@@ -0,0 +1,76 @@
+package org.onap.vid.job.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobsBrokerService;
+import org.onap.vid.job.command.JobCommandFactory;
+import org.onap.vid.properties.Features;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.quartz.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.stereotype.Component;
+import org.togglz.core.manager.FeatureManager;
+
+import javax.annotation.PostConstruct;
+
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+
+@Component
+public class JobSchedulerInitializer {
+
+ private JobsBrokerService jobsBrokerService;
+ private SchedulerFactoryBean schedulerFactoryBean;
+ private FeatureManager featureManager;
+ private JobCommandFactory jobCommandFactory;
+ private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobSchedulerInitializer.class);
+
+ @Autowired
+ public JobSchedulerInitializer(
+ JobsBrokerService jobsBrokerService,
+ SchedulerFactoryBean schedulerFactoryBean,
+ FeatureManager featureManager,
+ JobCommandFactory JobCommandFactory
+ ) {
+ this.jobsBrokerService = jobsBrokerService;
+ this.schedulerFactoryBean = schedulerFactoryBean;
+ this.featureManager = featureManager;
+ this.jobCommandFactory = JobCommandFactory;
+
+ }
+
+ @PostConstruct
+ public void init() {
+ if (!featureManager.isActive(Features.FLAG_ASYNC_JOBS)) {
+ return;
+ }
+ scheduleJobWorker(Job.JobStatus.PENDING, 1);
+ scheduleJobWorker(Job.JobStatus.IN_PROGRESS, 1);
+ }
+
+ private void scheduleJobWorker(Job.JobStatus topic, int intervalInSeconds) {
+ Scheduler scheduler = schedulerFactoryBean.getScheduler();
+ JobDetail jobDetail = JobBuilder.newJob().ofType(JobWorker.class)
+ .withIdentity("AsyncWorkersJob" + topic)
+ .withDescription("Job that run async worker for " + topic)
+ .setJobData(new JobDataMap(ImmutableMap.of(
+ "jobsBrokerService", jobsBrokerService,
+ "jobCommandFactory", jobCommandFactory,
+ "featureManager", featureManager,
+ "topic", topic
+ )))
+ .build();
+ Trigger asyncWorkerTrigger = TriggerBuilder.newTrigger().forJob(jobDetail)
+ .withIdentity("AsyncWorkersTrigger" + topic)
+ .withDescription("Trigger to run async worker for " + topic)
+ .withSchedule(simpleSchedule().repeatForever().withIntervalInSeconds(intervalInSeconds))
+ .build();
+ try {
+ scheduler.scheduleJob(jobDetail, asyncWorkerTrigger);
+ } catch (SchedulerException e) {
+ logger.error(EELFLoggerDelegate.errorLogger, "Failed to schedule trigger for async worker jobs: {}", e.getMessage());
+ throw new GenericUncheckedException(e);
+ }
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java
new file mode 100644
index 000000000..aa94a2aa0
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java
@@ -0,0 +1,135 @@
+package org.onap.vid.job.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobCommand;
+import org.onap.vid.job.JobsBrokerService;
+import org.onap.vid.job.NextCommand;
+import org.onap.vid.job.command.JobCommandFactory;
+import org.onap.vid.properties.Features;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.quartz.JobExecutionContext;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+import org.springframework.stereotype.Component;
+import org.togglz.core.manager.FeatureManager;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.onap.vid.job.Job.JobStatus.FAILED;
+import static org.onap.vid.job.Job.JobStatus.STOPPED;
+
+@Component
+public class JobWorker extends QuartzJobBean {
+
+ private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class);
+
+ private JobsBrokerService jobsBrokerService;
+ private FeatureManager featureManager;
+ private JobCommandFactory jobCommandFactory;
+ private Job.JobStatus topic;
+
+ @Override
+ protected void executeInternal(JobExecutionContext context) {
+ Optional<Job> job;
+
+ if (!isMsoNewApiActive()) {
+ return;
+ }
+
+ job = pullJob();
+
+ while (job.isPresent()) {
+ Job nextJob = executeJobAndGetNext(job.get());
+ pushBack(nextJob);
+
+ job = pullJob();
+ }
+ }
+
+ private Optional<Job> pullJob() {
+ try {
+ return jobsBrokerService.pull(topic, UUID.randomUUID().toString());
+ } catch (Exception e) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e);
+ return Optional.empty();
+ }
+ }
+
+ private void pushBack(Job nextJob) {
+ try {
+ jobsBrokerService.pushBack(nextJob);
+ } catch (Exception e) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e);
+ }
+ }
+
+ protected Job executeJobAndGetNext(Job job) {
+ LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}",
+ StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
+ StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
+ job.getStatus(), job.getType());
+
+ NextCommand nextCommand = executeCommandAndGetNext(job);
+
+ Job nextJob = setNextCommandInJob(nextCommand, job);
+
+ return nextJob;
+ }
+
+ private NextCommand executeCommandAndGetNext(Job job) {
+ NextCommand nextCommand;
+ try {
+ final JobCommand jobCommand = jobCommandFactory.toCommand(job);
+ nextCommand = jobCommand.call();
+ } catch (Exception e) {
+ LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e);
+ nextCommand = new NextCommand(FAILED);
+ }
+
+ if (nextCommand == null) {
+ nextCommand = new NextCommand(STOPPED);
+ }
+ return nextCommand;
+ }
+
+ private Job setNextCommandInJob(NextCommand nextCommand, Job job) {
+ LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}",
+ StringUtils.substring(String.valueOf(job.getUuid()), 0, 8),
+ StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8),
+ job.getStatus(), job.getType(),
+ nextCommand.getStatus(),
+ nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : "");
+
+ job.setStatus(nextCommand.getStatus());
+
+ if (nextCommand.getCommand() != null) {
+ job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData());
+ }
+
+ return job;
+ }
+
+ private boolean isMsoNewApiActive() {
+ return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION);
+ }
+
+
+ //used by quartz to inject JobsBrokerService into the job
+ //see JobSchedulerInitializer
+ public void setJobsBrokerService(JobsBrokerService jobsBrokerService) {
+ this.jobsBrokerService = jobsBrokerService;
+ }
+
+ public void setFeatureManager(FeatureManager featureManager) {
+ this.featureManager = featureManager;
+ }
+
+ public void setJobCommandFactory(JobCommandFactory jobCommandFactory) {
+ this.jobCommandFactory = jobCommandFactory;
+ }
+
+ public void setTopic(Job.JobStatus topic) {
+ this.topic = topic;
+ }
+}
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
new file mode 100644
index 000000000..e286cc4aa
--- /dev/null
+++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java
@@ -0,0 +1,236 @@
+package org.onap.vid.job.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.hibernate.SessionFactory;
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.exceptions.OperationNotAllowedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobsBrokerService;
+import org.onap.vid.properties.VidProperties;
+import org.onap.vid.utils.DaoUtils;
+import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate;
+import org.onap.portalsdk.core.service.DataAccessService;
+import org.onap.portalsdk.core.util.SystemProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.*;
+
+@Service
+public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService {
+
+ static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class);
+
+ private final DataAccessService dataAccessService;
+
+ private final SessionFactory sessionFactory;
+ private int maxOpenedInstantiationRequestsToMso;
+ private int pollingIntervalSeconds;
+
+ @Autowired
+ public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory,
+ @Value("0") int maxOpenedInstantiationRequestsToMso,
+ @Value("10") int pollingIntervalSeconds) {
+ // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration
+ this.dataAccessService = dataAccessService;
+ this.sessionFactory = sessionFactory;
+ this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso;
+ this.pollingIntervalSeconds = pollingIntervalSeconds;
+ }
+
+ @PostConstruct
+ public void configure() {
+ maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS));
+ pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS));
+ }
+
+ public void deleteAll() {
+ dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null);
+ }
+
+ @Override
+ public UUID add(Job job) {
+ final JobDaoImpl jobDao = castToJobDaoImpl(job);
+ jobDao.setUuid(UUID.randomUUID());
+ dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
+ return job.getUuid();
+ }
+
+ @Override
+ public Optional<Job> pull(Job.JobStatus topic, String ownerId) {
+ JobDaoImpl daoJob;
+ int updatedEntities;
+ do {
+ String query = sqlQueryForTopic(topic);
+ List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null);
+ if (jobs.isEmpty()) {
+ return Optional.empty();
+ }
+
+ daoJob = jobs.get(0);
+
+ final UUID uuid = daoJob.getUuid();
+ final Integer age = daoJob.getAge();
+
+ daoJob.setTakenBy(ownerId);
+
+ // It might become that daoJob was taken and pushed-back already, before we
+ // arrived here, so we're verifying the age was not pushed forward.
+ // Age is actually forwarded upon pushBack().
+ String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " +
+ " job.id = :id" +
+ " and job.age = :age" +
+ " and takenBy is null";
+ updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
+ session.createQuery(hqlUpdate)
+ .setText("id", uuid.toString())
+ .setInteger("age", age)
+ .setText("takenBy", ownerId)
+ .executeUpdate());
+
+ } while (updatedEntities == 0);
+
+ return Optional.ofNullable(daoJob);
+ }
+
+ private java.sql.Timestamp nowMinusInterval() {
+ return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds));
+ }
+
+ private String sqlQueryForTopic(Job.JobStatus topic) {
+ switch (topic) {
+ case IN_PROGRESS:
+ return "" +
+ "select * from VID_JOB" +
+ " where" +
+ // select only non-deleted in-progress jobs
+ " JOB_STATUS = 'IN_PROGRESS'" +
+ " and TAKEN_BY is null" +
+ " and DELETED_AT is null" +
+ // give some breath, don't select jos that were recently reached
+ " and MODIFIED_DATE <= '" + nowMinusInterval() +
+ // take the oldest handled one
+ "' order by MODIFIED_DATE ASC" +
+ // select only one result
+ " limit 1";
+
+ case PENDING:
+ return "" +
+ // select only pending jobs
+ "select vid_job.* from VID_JOB " +
+ // select users have in_progress jobs
+ "left join \n" +
+ " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" +
+ "group by user_id) users_have_any_in_progress_job_tbl\n" +
+ "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " +
+ "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" +
+ // job is not deleted
+ " AND DELETED_AT is null and (\n" +
+ // limit in-progress to some amount
+ "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" +
+ "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " +
+ // don't take jobs from templates that already in-progress/failed
+ "and TEMPLATE_Id not in \n" +
+ "(select TEMPLATE_Id from vid_job where" +
+ " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted
+ " or JOB_STATUS='IN_PROGRESS'" +
+ " or TAKEN_BY IS NOT NULL)" + " \n " +
+ // prefer older jobs, but the earlier in each bulk
+ "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " +
+ // select only one result
+ "limit 1";
+ default:
+ throw new GenericUncheckedException("Unsupported topic to pull from: " + topic);
+ }
+ }
+
+
+ private byte[] getUuidAsByteArray(UUID owner) {
+ ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+ bb.putLong(owner.getMostSignificantBits());
+ bb.putLong(owner.getLeastSignificantBits());
+ return bb.array();
+ }
+
+ @Override
+ public void pushBack(Job job) {
+ final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null);
+
+ if (remoteDaoJob == null) {
+ throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()");
+ }
+
+ if (remoteDaoJob.getTakenBy() == null) {
+ throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back.");
+ }
+
+ final JobDaoImpl jobDao = castToJobDaoImpl(job);
+
+ jobDao.setTakenBy(null);
+
+ Integer age = jobDao.getAge();
+ jobDao.setAge(age + 1);
+
+ logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType());
+
+ dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap());
+ }
+
+ private JobDaoImpl castToJobDaoImpl(Job job) {
+ if (!(job instanceof JobDaoImpl)) {
+ throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass());
+ }
+ return (JobDaoImpl) job;
+ }
+
+ @Override
+ public Collection<Job> peek() {
+ return dataAccessService.getList(JobDaoImpl.class, null);
+ }
+
+ @Override
+ public Job peek(UUID jobId) {
+ return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
+ }
+
+ @Override
+ public void delete(UUID jobId) {
+ int updatedEntities;
+ Date now = new Date();
+
+ String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " +
+ " job.id = :id" +
+ " and job.status in(:pending, :stopped)" +
+ " and takenBy is null";
+
+ updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session ->
+ session.createQuery(hqlUpdate)
+ .setTimestamp("now", now)
+ .setText("id", jobId.toString())
+ .setText("pending", Job.JobStatus.PENDING.toString())
+ .setText("stopped", Job.JobStatus.STOPPED.toString())
+ .executeUpdate());
+
+ if (updatedEntities == 0) {
+ final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null);
+
+ if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) {
+ logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId);
+ throw new OperationNotAllowedException("Service does not exist");
+ }
+
+ if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) {
+ logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() +
+ ", takenBy " + remoteDaoJob.getTakenBy());
+ throw new OperationNotAllowedException("Service status does not allow deletion from the queue");
+ }
+
+ throw new OperationNotAllowedException("Service deletion failed");
+ }
+ }
+}