diff options
Diffstat (limited to 'vid-app-common/src/main/java/org/onap/vid/job')
17 files changed, 1298 insertions, 0 deletions
diff --git a/vid-app-common/src/main/java/org/onap/vid/job/Job.java b/vid-app-common/src/main/java/org/onap/vid/job/Job.java new file mode 100644 index 000000000..77f348dc6 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/Job.java @@ -0,0 +1,47 @@ +package org.onap.vid.job; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Map; +import java.util.UUID; + +public interface Job { + + UUID getUuid(); + + void setUuid(UUID uuid); + + JobStatus getStatus(); + + void setStatus(JobStatus status); + + @JsonIgnore + Map<String, Object> getData(); + + void setTypeAndData(JobType jobType, Map<String, Object> data); + + UUID getTemplateId(); + + void setTemplateId(UUID templateId); + + void setIndexInBulk(Integer indexInBulk); + + JobType getType(); + + enum JobStatus { + COMPLETED(true), + FAILED(true), + IN_PROGRESS(false), + PAUSE(false), + PENDING(false), + STOPPED(true); + + private final Boolean finalStatus; + public Boolean isFinal(){return finalStatus;} + + JobStatus(Boolean finalStatus) + { + this.finalStatus = finalStatus ; + } + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java b/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java new file mode 100644 index 000000000..1701092b3 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/JobAdapter.java @@ -0,0 +1,26 @@ +package org.onap.vid.job; + +import org.onap.vid.model.JobBulk; +import org.onap.vid.model.JobModel; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * kind of factory for creating jobs and converting them to Job Model + */ +public interface JobAdapter { + JobModel toModel(Job job); + + JobBulk toModelBulk(List<Job> jobList); + + List<Job> createBulkOfJobs(Map<String, Object> bulkRequest); + + Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk); + + // Marks types that are an AsyncJob payload + public interface AsyncJobRequest { + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java new file mode 100644 index 000000000..c32645cad --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/JobCommand.java @@ -0,0 +1,41 @@ +package org.onap.vid.job; + +import java.util.Map; +import java.util.UUID; + + +/** + * A callable instance, with serializable characteristics. + * Represents a step in a chain of steps, which eventualy + * resides into a packing Job. + */ +public interface JobCommand { + + /** + * Initialize the command state + * @param jobUuid Parent job's uuid + * @param data An input to be set into the command. Each implementation may expect different keys in the map. + * @return Returns itself + */ + default JobCommand init(UUID jobUuid, Map<String, Object> data) { + return this; + } + + /** + * @return Returns the inner state of the command. This state, once passed into init(), should + * bring the command back to it's state. + */ + Map<String, Object> getData(); + + /** + * Execute the command represented by this instance. Assumes the instance is already init(). + * @return A NextCommand containing the next command in chain of commands, or null if chain + * should be terminated. Might return itself (packed in a NextCommand). + */ + NextCommand call(); + + default JobType getType() { + return JobType.jobTypeOf(this.getClass()); + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobType.java b/vid-app-common/src/main/java/org/onap/vid/job/JobType.java new file mode 100644 index 000000000..9846d2775 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/JobType.java @@ -0,0 +1,31 @@ +package org.onap.vid.job; + +import org.onap.vid.job.command.*; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public enum JobType { + + HttpCall(HttpCallCommand.class), + AggregateState(AggregateStateCommand.class), + ServiceInstantiation(ServiceInstantiationCommand.class), + InProgressStatus(InProgressStatusCommand.class), + NoOp(NoOpCommand.class); + + private static final Map<Class, JobType> REVERSE_MAP = Stream.of(values()).collect(Collectors.toMap(t -> t.getCommandClass(), t -> t)); + + private final Class commandClass; + + <T extends JobCommand> JobType(Class<T> commandClass) { + this.commandClass = commandClass; + } + + public Class getCommandClass() { + return commandClass; + } + static JobType jobTypeOf(Class commandClass) { + return REVERSE_MAP.get(commandClass); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java b/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java new file mode 100644 index 000000000..856f50b2e --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/JobsBrokerService.java @@ -0,0 +1,21 @@ +package org.onap.vid.job; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; + +public interface JobsBrokerService { + + UUID add(Job job); + + Optional<Job> pull(Job.JobStatus topic, String ownerId); + + void pushBack(Job job); + + Collection<Job> peek(); + + Job peek(UUID jobId); + + void delete(UUID jobId); + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java new file mode 100644 index 000000000..55f375bb7 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/NextCommand.java @@ -0,0 +1,25 @@ +package org.onap.vid.job; + +public class NextCommand { + private final Job.JobStatus status; + private final JobCommand command; + + public NextCommand(Job.JobStatus nextStatus, JobCommand nextCommand) { + this.status = nextStatus; + this.command = nextCommand; + } + + public NextCommand(Job.JobStatus nextStatus) { + this.status = nextStatus; + this.command = null; + } + + public Job.JobStatus getStatus() { + return status; + } + + public JobCommand getCommand() { + return command; + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java new file mode 100644 index 000000000..65aadf3be --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/AggregateStateCommand.java @@ -0,0 +1,26 @@ +package org.onap.vid.job.command; + +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.NextCommand; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.Map; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class AggregateStateCommand implements JobCommand { + + @Override + public NextCommand call() { + return null; + } + + @Override + public Map<String, Object> getData() { + return Collections.emptyMap(); + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java new file mode 100644 index 000000000..5951d7c83 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/HttpCallCommand.java @@ -0,0 +1,52 @@ +package org.onap.vid.job.command; + +import com.google.common.collect.ImmutableMap; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.NextCommand; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Response; +import java.util.Map; +import java.util.UUID; + + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class HttpCallCommand implements JobCommand { + private String url; + private UUID uuid; + + public HttpCallCommand() { + } + + public HttpCallCommand(String url, UUID uuid) { + init(url, uuid); + } + + @Override + public NextCommand call() { + final Response response = ClientBuilder.newClient().target(url).request().post(Entity.text(uuid.toString())); + return new NextCommand(Job.JobStatus.COMPLETED); + } + + @Override + public HttpCallCommand init(UUID jobUuid, Map<String, Object> data) { + return init((String) data.get("url"), jobUuid); + } + + private HttpCallCommand init(String url, UUID jobUuid) { + this.url = url; + this.uuid = jobUuid; + return this; + } + + @Override + public Map<String, Object> getData() { + return ImmutableMap.of("url", url); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java new file mode 100644 index 000000000..64c782c00 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/InProgressStatusCommand.java @@ -0,0 +1,110 @@ +package org.onap.vid.job.command; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.onap.vid.job.Job.JobStatus; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.NextCommand; +import org.onap.vid.mso.RestMsoImplementation; +import org.onap.vid.mso.RestObject; +import org.onap.vid.mso.rest.AsyncRequestStatus; +import org.onap.vid.services.AsyncInstantiationBusinessLogic; +import org.onap.vid.services.AuditService; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Map; +import java.util.UUID; + + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class InProgressStatusCommand implements JobCommand { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(InProgressStatusCommand.class); + + @Inject + private AsyncInstantiationBusinessLogic asyncInstantiationBL; + + @Inject + private RestMsoImplementation restMso; + + @Inject + private AuditService auditService; + + private String requestId; + + private UUID jobUuid; + + public InProgressStatusCommand() { + } + + InProgressStatusCommand(UUID jobUuid, String requestId) { + init(jobUuid, requestId); + } + + @Override + public NextCommand call() { + + try { + String path = asyncInstantiationBL.getOrchestrationRequestsPath()+"/"+requestId; + RestObject<AsyncRequestStatus> msoResponse = restMso.GetForObject("", path, AsyncRequestStatus.class); + JobStatus jobStatus; + if (msoResponse.getStatusCode() >= 400 || msoResponse.get() == null) { + auditService.setFailedAuditStatusFromMso(jobUuid, requestId, msoResponse.getStatusCode(), msoResponse.getRaw()); + LOGGER.error(EELFLoggerDelegate.errorLogger, + "Failed to get orchestration status for {}. Status code: {}, Body: {}", + requestId, msoResponse.getStatusCode(), msoResponse.getRaw()); + return new NextCommand(JobStatus.IN_PROGRESS, this); + } + else { + jobStatus = asyncInstantiationBL.calcStatus(msoResponse.get()); + } + + asyncInstantiationBL.auditMsoStatus(jobUuid,msoResponse.get().request); + + + if (jobStatus == JobStatus.FAILED) { + asyncInstantiationBL.handleFailedInstantiation(jobUuid); + } + else { + asyncInstantiationBL.updateServiceInfoAndAuditStatus(jobUuid, jobStatus); + } + //in case of JobStatus.PAUSE we leave the job itself as IN_PROGRESS, for keep tracking job progress + if (jobStatus == JobStatus.PAUSE) { + return new NextCommand(JobStatus.IN_PROGRESS, this); + } + return new NextCommand(jobStatus, this); + } catch (javax.ws.rs.ProcessingException e) { + // Retry when we can't connect MSO during getStatus + LOGGER.error(EELFLoggerDelegate.errorLogger, "Cannot get orchestration status for {}, will retry: {}", requestId, e, e); + return new NextCommand(JobStatus.IN_PROGRESS, this); + } catch (RuntimeException e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "Cannot get orchestration status for {}, stopping: {}", requestId, e, e); + return new NextCommand(JobStatus.STOPPED, this); + } + } + + @Override + public InProgressStatusCommand init(UUID jobUuid, Map<String, Object> data) { + return init(jobUuid, (String) data.get("requestId")); + } + + private InProgressStatusCommand init(UUID jobUuid, String requestId) { + this.requestId = requestId; + this.jobUuid = jobUuid; + return this; + } + + @Override + public Map<String, Object> getData() { + return ImmutableMap.of("requestId", requestId); + } + + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java b/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java new file mode 100644 index 000000000..1e613c58b --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/JobCommandFactory.java @@ -0,0 +1,43 @@ +package org.onap.vid.job.command; + +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.function.Function; + +@Component +public class JobCommandFactory { + + final Function<Class<? extends JobCommand>, JobCommand> jobFactory; + + @Inject + public JobCommandFactory(ApplicationContext applicationContext) { + this.jobFactory = (jobType -> { + final Object commandBean = applicationContext.getBean(jobType); + + if (!(commandBean instanceof JobCommand)) { + throw new GenericUncheckedException(commandBean.getClass() + " is not a JobCommand"); + } + + return (JobCommand) commandBean; + }); + } + + public JobCommandFactory(Function<Class<? extends JobCommand>, JobCommand> jobFactory) { + this.jobFactory = jobFactory; + } + + public JobCommand toCommand(Job job) { + + final JobCommand command = jobFactory.apply(job.getType().getCommandClass()); + command.init(job.getUuid(), job.getData()); + + return command; + } + + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java new file mode 100644 index 000000000..37636fcaa --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/NoOpCommand.java @@ -0,0 +1,25 @@ +package org.onap.vid.job.command; + +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.NextCommand; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.Map; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class NoOpCommand implements JobCommand { + + @Override + public NextCommand call() { + return null; + } + + @Override + public Map<String, Object> getData() { + return Collections.emptyMap(); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java b/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java new file mode 100644 index 000000000..6afacf23e --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/command/ServiceInstantiationCommand.java @@ -0,0 +1,140 @@ +package org.onap.vid.job.command; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.onap.vid.aai.exceptions.InvalidAAIResponseException; +import org.onap.vid.changeManagement.RequestDetailsWrapper; +import org.onap.vid.exceptions.MaxRetriesException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.NextCommand; +import org.onap.vid.model.RequestReferencesContainer; +import org.onap.vid.model.serviceInstantiation.ServiceInstantiation; +import org.onap.vid.mso.RestMsoImplementation; +import org.onap.vid.mso.RestObject; +import org.onap.vid.mso.model.ServiceInstantiationRequestDetails; +import org.onap.vid.services.AsyncInstantiationBusinessLogic; +import org.onap.vid.services.AuditService; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Map; +import java.util.UUID; + + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class ServiceInstantiationCommand implements JobCommand { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(ServiceInstantiationCommand.class); + + @Inject + private AsyncInstantiationBusinessLogic asyncInstantiationBL; + + @Inject + private AuditService auditService; + + @Inject + private RestMsoImplementation restMso; + + private UUID uuid; + private ServiceInstantiation serviceInstantiationRequest; + private String userId; + + public ServiceInstantiationCommand() { + } + + public ServiceInstantiationCommand(UUID uuid, ServiceInstantiation serviceInstantiationRequest, String userId) { + init(uuid, serviceInstantiationRequest, userId); + } + + @Override + public NextCommand call() { + RequestDetailsWrapper<ServiceInstantiationRequestDetails> requestDetailsWrapper ; + try { + requestDetailsWrapper = asyncInstantiationBL.generateServiceInstantiationRequest( + uuid, serviceInstantiationRequest, userId + ); + } + + //Aai return bad response while checking names uniqueness + catch (InvalidAAIResponseException exception) { + LOGGER.error("Failed to check name uniqueness in AAI. VID will try again later", exception); + //put the job in_progress so we will keep trying to check name uniqueness in AAI + //And then send the request to MSO + return new NextCommand(Job.JobStatus.IN_PROGRESS, this); + } + + //Vid reached to max retries while trying to find unique name in AAI + catch (MaxRetriesException exception) { + LOGGER.error("Failed to find unused name in AAI. Set the job to FAILED ", exception); + return handleCommandFailed(); + } + + String path = asyncInstantiationBL.getServiceInstantiationPath(serviceInstantiationRequest); + + RestObject<RequestReferencesContainer> msoResponse = restMso.PostForObject(requestDetailsWrapper, "", + path, RequestReferencesContainer.class); + + if (msoResponse.getStatusCode() >= 200 && msoResponse.getStatusCode() < 400) { + final Job.JobStatus jobStatus = Job.JobStatus.IN_PROGRESS; + final String requestId = msoResponse.get().getRequestReferences().getRequestId(); + final String instanceId = msoResponse.get().getRequestReferences().getInstanceId(); + asyncInstantiationBL.auditVidStatus(uuid, jobStatus); + setInitialRequestAuditStatusFromMso(requestId); + asyncInstantiationBL.updateServiceInfo(uuid, x-> { + x.setJobStatus(jobStatus); + x.setServiceInstanceId(instanceId); + }); + + return new NextCommand(jobStatus, new InProgressStatusCommand(uuid, requestId)); + } else { + auditService.setFailedAuditStatusFromMso(uuid,null, msoResponse.getStatusCode(),msoResponse.getRaw()); + return handleCommandFailed(); + } + + } + + private void setInitialRequestAuditStatusFromMso(String requestId){ + final String initialMsoRequestStatus = "REQUESTED"; + asyncInstantiationBL.auditMsoStatus(uuid,initialMsoRequestStatus,requestId,null); + } + + protected NextCommand handleCommandFailed() { + asyncInstantiationBL.handleFailedInstantiation(uuid); + return new NextCommand(Job.JobStatus.FAILED); + } + + @Override + public ServiceInstantiationCommand init(UUID jobUuid, Map<String, Object> data) { + final Object request = data.get("request"); + + return init( + jobUuid, + OBJECT_MAPPER.convertValue(request, ServiceInstantiation.class), + (String) data.get("userId") + ); + } + + private ServiceInstantiationCommand init(UUID jobUuid, ServiceInstantiation serviceInstantiationRequest, String userId) { + this.uuid = jobUuid; + this.serviceInstantiationRequest = serviceInstantiationRequest; + this.userId = userId; + + return this; + } + + @Override + public Map<String, Object> getData() { + return ImmutableMap.of( + "uuid", uuid, + "request", serviceInstantiationRequest, + "userId", userId + ); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java new file mode 100644 index 000000000..77e1dd2cf --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobAdapterImpl.java @@ -0,0 +1,72 @@ +package org.onap.vid.job.impl; + +import com.google.common.collect.ImmutableMap; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobAdapter; +import org.onap.vid.job.JobType; +import org.onap.vid.model.JobBulk; +import org.onap.vid.model.JobModel; +import org.springframework.stereotype.Component; + +import javax.ws.rs.BadRequestException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +@Component +public class JobAdapterImpl implements JobAdapter { + + @Override + public JobModel toModel(Job job) { + JobModel jobModel = new JobModel(); + jobModel.setUuid(job.getUuid()); + jobModel.setStatus(job.getStatus()); + jobModel.setTemplateId(job.getTemplateId()); + return jobModel; + } + + @Override + public JobBulk toModelBulk(List<Job> jobList) { + return new JobBulk(jobList.stream().map(this::toModel).collect(Collectors.toList())); + } + + @Override + public Job createJob(JobType jobType, AsyncJobRequest request, UUID templateId, String userId, Integer indexInBulk){ + JobDaoImpl job = new JobDaoImpl(); + job.setStatus(Job.JobStatus.PENDING); + job.setTypeAndData(jobType, ImmutableMap.of( + "request", request, + "userId", userId)); + job.setTemplateId(templateId); + job.setIndexInBulk(indexInBulk); + job.setUserId(userId); + return job; + } + + @Override + public List<Job> createBulkOfJobs(Map<String, Object> bulkRequest) { + int count; + JobType jobType; + + try { + count = (Integer) bulkRequest.get("count"); + jobType = JobType.valueOf((String) bulkRequest.get("type")); + } catch (Exception exception) { + throw new BadRequestException(exception); + } + List<Job> jobList = new ArrayList<>(count + 1); + UUID templateId = UUID.randomUUID(); + for (int i = 0; i < count; i++) { + Job child = new JobDaoImpl(); + child.setTypeAndData(jobType, bulkRequest); + child.setStatus(Job.JobStatus.PENDING); + child.setTemplateId(templateId); + child.setIndexInBulk(i); + jobList.add(child); + } + return jobList; + } + +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java new file mode 100644 index 000000000..1ff1296c7 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobDaoImpl.java @@ -0,0 +1,192 @@ +package org.onap.vid.job.impl; + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.MoreObjects; +import org.hibernate.annotations.Type; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobType; +import org.onap.vid.model.VidBaseEntity; + +import javax.persistence.*; +import java.io.IOException; +import java.util.*; + +@Entity +@Table(name = "vid_job") +public class JobDaoImpl extends VidBaseEntity implements Job { + + private static ObjectMapper objectMapper = new ObjectMapper(); + private Job.JobStatus status; + private JobType type; + private Map<JobType, Map<String, Object>> data = new TreeMap<>(); + private UUID templateId; + private UUID uuid; + private String takenBy; + private String userId; + private Integer age = 0; + private Integer indexInBulk = 0; + private Date deletedAt; + + @Id + @Column(name = "JOB_ID", columnDefinition = "CHAR(36)") + @Type(type="org.hibernate.type.UUIDCharType") + public UUID getUuid() { + return uuid; + } + + public void setUuid(UUID uuid) { + this.uuid = uuid; + } + + //we use uuid instead id. So making id Transient + @Override + @Transient + @JsonIgnore + public Long getId() { + return this.getUuid().getLeastSignificantBits(); + } + + @Column(name = "JOB_STATUS") + @Enumerated(EnumType.STRING) + public Job.JobStatus getStatus() { + return status; + } + + public void setStatus(Job.JobStatus status) { + this.status = status; + } + + @Enumerated(EnumType.STRING) + @Column(name = "JOB_TYPE") + public JobType getType() { + return type; + } + + public void setType(JobType type) { + this.type = type; + } + + //the columnDefinition is relevant only for UT + @Column(name = "JOB_DATA", columnDefinition = "VARCHAR(30000)") + public String getDataRaw() { + try { + return objectMapper.writeValueAsString(data); + } catch (JsonProcessingException e) { + throw new GenericUncheckedException(e); + } + } + + public void setDataRaw(String data) { + try { + this.data = objectMapper.readValue(data, new TypeReference<Map<JobType, Map<String, Object>>>() { + }); + } catch (IOException e) { + throw new GenericUncheckedException(e); + } + } + + @Transient + public Map<String, Object> getData() { + return data.get(getType()); + } + + @Override + public void setTypeAndData(JobType jobType, Map<String, Object> data) { + // *add* the data to map, + // then change state to given type + this.type = jobType; + this.data.put(jobType, data); + } + + @Column(name = "TAKEN_BY") + public String getTakenBy() { + return takenBy; + } + + public void setTakenBy(String takenBy) { + this.takenBy = takenBy; + } + + @Type(type="org.hibernate.type.UUIDCharType") + @Column(name = "TEMPLATE_ID", columnDefinition = "CHAR(36)") + public UUID getTemplateId() { + return templateId; + } + + @Override + public void setTemplateId(UUID templateId) { + this.templateId = templateId; + } + + @Column(name="INDEX_IN_BULK") + public Integer getIndexInBulk() { + return indexInBulk; + } + + @Override + public void setIndexInBulk(Integer indexInBulk) { + this.indexInBulk = indexInBulk; + } + + @Column(name="USER_ID") + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + @Column(name="AGE") + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } + + @Column(name="DELETED_AT") + public Date getDeletedAt() { + return deletedAt; + } + + public void setDeletedAt(Date deletedAt) { + this.deletedAt = deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof JobDaoImpl)) return false; + JobDaoImpl daoJob = (JobDaoImpl) o; + return Objects.equals(getUuid(), daoJob.getUuid()); + } + + @Override + public int hashCode() { + return Objects.hash(getUuid()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("type", type) + .add("templateId", templateId) + .add("uuid", uuid) + .add("takenBy", takenBy) + .add("userId", userId) + .add("age", age) + .add("created", created) + .add("modified", modified) + .add("deletedAt", deletedAt) + .add("data", data) + .toString(); + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java new file mode 100644 index 000000000..a5070fbdf --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobSchedulerInitializer.java @@ -0,0 +1,76 @@ +package org.onap.vid.job.impl; + +import com.google.common.collect.ImmutableMap; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.command.JobCommandFactory; +import org.onap.vid.properties.Features; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Component; +import org.togglz.core.manager.FeatureManager; + +import javax.annotation.PostConstruct; + +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; + +@Component +public class JobSchedulerInitializer { + + private JobsBrokerService jobsBrokerService; + private SchedulerFactoryBean schedulerFactoryBean; + private FeatureManager featureManager; + private JobCommandFactory jobCommandFactory; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobSchedulerInitializer.class); + + @Autowired + public JobSchedulerInitializer( + JobsBrokerService jobsBrokerService, + SchedulerFactoryBean schedulerFactoryBean, + FeatureManager featureManager, + JobCommandFactory JobCommandFactory + ) { + this.jobsBrokerService = jobsBrokerService; + this.schedulerFactoryBean = schedulerFactoryBean; + this.featureManager = featureManager; + this.jobCommandFactory = JobCommandFactory; + + } + + @PostConstruct + public void init() { + if (!featureManager.isActive(Features.FLAG_ASYNC_JOBS)) { + return; + } + scheduleJobWorker(Job.JobStatus.PENDING, 1); + scheduleJobWorker(Job.JobStatus.IN_PROGRESS, 1); + } + + private void scheduleJobWorker(Job.JobStatus topic, int intervalInSeconds) { + Scheduler scheduler = schedulerFactoryBean.getScheduler(); + JobDetail jobDetail = JobBuilder.newJob().ofType(JobWorker.class) + .withIdentity("AsyncWorkersJob" + topic) + .withDescription("Job that run async worker for " + topic) + .setJobData(new JobDataMap(ImmutableMap.of( + "jobsBrokerService", jobsBrokerService, + "jobCommandFactory", jobCommandFactory, + "featureManager", featureManager, + "topic", topic + ))) + .build(); + Trigger asyncWorkerTrigger = TriggerBuilder.newTrigger().forJob(jobDetail) + .withIdentity("AsyncWorkersTrigger" + topic) + .withDescription("Trigger to run async worker for " + topic) + .withSchedule(simpleSchedule().repeatForever().withIntervalInSeconds(intervalInSeconds)) + .build(); + try { + scheduler.scheduleJob(jobDetail, asyncWorkerTrigger); + } catch (SchedulerException e) { + logger.error(EELFLoggerDelegate.errorLogger, "Failed to schedule trigger for async worker jobs: {}", e.getMessage()); + throw new GenericUncheckedException(e); + } + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java new file mode 100644 index 000000000..aa94a2aa0 --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobWorker.java @@ -0,0 +1,135 @@ +package org.onap.vid.job.impl; + +import org.apache.commons.lang3.StringUtils; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobCommand; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.NextCommand; +import org.onap.vid.job.command.JobCommandFactory; +import org.onap.vid.properties.Features; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.quartz.JobExecutionContext; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.stereotype.Component; +import org.togglz.core.manager.FeatureManager; + +import java.util.Optional; +import java.util.UUID; + +import static org.onap.vid.job.Job.JobStatus.FAILED; +import static org.onap.vid.job.Job.JobStatus.STOPPED; + +@Component +public class JobWorker extends QuartzJobBean { + + private static final EELFLoggerDelegate LOGGER = EELFLoggerDelegate.getLogger(JobWorker.class); + + private JobsBrokerService jobsBrokerService; + private FeatureManager featureManager; + private JobCommandFactory jobCommandFactory; + private Job.JobStatus topic; + + @Override + protected void executeInternal(JobExecutionContext context) { + Optional<Job> job; + + if (!isMsoNewApiActive()) { + return; + } + + job = pullJob(); + + while (job.isPresent()) { + Job nextJob = executeJobAndGetNext(job.get()); + pushBack(nextJob); + + job = pullJob(); + } + } + + private Optional<Job> pullJob() { + try { + return jobsBrokerService.pull(topic, UUID.randomUUID().toString()); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed to pull job from queue, breaking: {}", e, e); + return Optional.empty(); + } + } + + private void pushBack(Job nextJob) { + try { + jobsBrokerService.pushBack(nextJob); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "failed pushing back job to queue: {}", e, e); + } + } + + protected Job executeJobAndGetNext(Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "going to execute job {} of {}: {}/{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType()); + + NextCommand nextCommand = executeCommandAndGetNext(job); + + Job nextJob = setNextCommandInJob(nextCommand, job); + + return nextJob; + } + + private NextCommand executeCommandAndGetNext(Job job) { + NextCommand nextCommand; + try { + final JobCommand jobCommand = jobCommandFactory.toCommand(job); + nextCommand = jobCommand.call(); + } catch (Exception e) { + LOGGER.error(EELFLoggerDelegate.errorLogger, "error while executing job from queue: {}", e, e); + nextCommand = new NextCommand(FAILED); + } + + if (nextCommand == null) { + nextCommand = new NextCommand(STOPPED); + } + return nextCommand; + } + + private Job setNextCommandInJob(NextCommand nextCommand, Job job) { + LOGGER.debug(EELFLoggerDelegate.debugLogger, "transforming job {} of {}: {}/{} -> {}{}", + StringUtils.substring(String.valueOf(job.getUuid()), 0, 8), + StringUtils.substring(String.valueOf(job.getTemplateId()), 0, 8), + job.getStatus(), job.getType(), + nextCommand.getStatus(), + nextCommand.getCommand() != null ? ("/" + nextCommand.getCommand().getType()) : ""); + + job.setStatus(nextCommand.getStatus()); + + if (nextCommand.getCommand() != null) { + job.setTypeAndData(nextCommand.getCommand().getType(), nextCommand.getCommand().getData()); + } + + return job; + } + + private boolean isMsoNewApiActive() { + return featureManager.isActive(Features.FLAG_ASYNC_INSTANTIATION); + } + + + //used by quartz to inject JobsBrokerService into the job + //see JobSchedulerInitializer + public void setJobsBrokerService(JobsBrokerService jobsBrokerService) { + this.jobsBrokerService = jobsBrokerService; + } + + public void setFeatureManager(FeatureManager featureManager) { + this.featureManager = featureManager; + } + + public void setJobCommandFactory(JobCommandFactory jobCommandFactory) { + this.jobCommandFactory = jobCommandFactory; + } + + public void setTopic(Job.JobStatus topic) { + this.topic = topic; + } +} diff --git a/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java new file mode 100644 index 000000000..e286cc4aa --- /dev/null +++ b/vid-app-common/src/main/java/org/onap/vid/job/impl/JobsBrokerServiceInDatabaseImpl.java @@ -0,0 +1,236 @@ +package org.onap.vid.job.impl; + +import org.apache.commons.lang3.StringUtils; +import org.hibernate.SessionFactory; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.exceptions.OperationNotAllowedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.properties.VidProperties; +import org.onap.vid.utils.DaoUtils; +import org.onap.portalsdk.core.logging.logic.EELFLoggerDelegate; +import org.onap.portalsdk.core.service.DataAccessService; +import org.onap.portalsdk.core.util.SystemProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.*; + +@Service +public class JobsBrokerServiceInDatabaseImpl implements JobsBrokerService { + + static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(JobsBrokerServiceInDatabaseImpl.class); + + private final DataAccessService dataAccessService; + + private final SessionFactory sessionFactory; + private int maxOpenedInstantiationRequestsToMso; + private int pollingIntervalSeconds; + + @Autowired + public JobsBrokerServiceInDatabaseImpl(DataAccessService dataAccessService, SessionFactory sessionFactory, + @Value("0") int maxOpenedInstantiationRequestsToMso, + @Value("10") int pollingIntervalSeconds) { + // tha @Value will inject conservative defaults; overridden in @PostConstruct from configuration + this.dataAccessService = dataAccessService; + this.sessionFactory = sessionFactory; + this.maxOpenedInstantiationRequestsToMso = maxOpenedInstantiationRequestsToMso; + this.pollingIntervalSeconds = pollingIntervalSeconds; + } + + @PostConstruct + public void configure() { + maxOpenedInstantiationRequestsToMso = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_MAX_OPENED_INSTANTIATION_REQUESTS)); + pollingIntervalSeconds = Integer.parseInt(SystemProperties.getProperty(VidProperties.MSO_ASYNC_POLLING_INTERVAL_SECONDS)); + } + + public void deleteAll() { + dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", null); + } + + @Override + public UUID add(Job job) { + final JobDaoImpl jobDao = castToJobDaoImpl(job); + jobDao.setUuid(UUID.randomUUID()); + dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); + return job.getUuid(); + } + + @Override + public Optional<Job> pull(Job.JobStatus topic, String ownerId) { + JobDaoImpl daoJob; + int updatedEntities; + do { + String query = sqlQueryForTopic(topic); + List<JobDaoImpl> jobs = dataAccessService.executeSQLQuery(query, JobDaoImpl.class, null); + if (jobs.isEmpty()) { + return Optional.empty(); + } + + daoJob = jobs.get(0); + + final UUID uuid = daoJob.getUuid(); + final Integer age = daoJob.getAge(); + + daoJob.setTakenBy(ownerId); + + // It might become that daoJob was taken and pushed-back already, before we + // arrived here, so we're verifying the age was not pushed forward. + // Age is actually forwarded upon pushBack(). + String hqlUpdate = "update JobDaoImpl job set job.takenBy = :takenBy where " + + " job.id = :id" + + " and job.age = :age" + + " and takenBy is null"; + updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> + session.createQuery(hqlUpdate) + .setText("id", uuid.toString()) + .setInteger("age", age) + .setText("takenBy", ownerId) + .executeUpdate()); + + } while (updatedEntities == 0); + + return Optional.ofNullable(daoJob); + } + + private java.sql.Timestamp nowMinusInterval() { + return Timestamp.valueOf(LocalDateTime.now().minusSeconds(pollingIntervalSeconds)); + } + + private String sqlQueryForTopic(Job.JobStatus topic) { + switch (topic) { + case IN_PROGRESS: + return "" + + "select * from VID_JOB" + + " where" + + // select only non-deleted in-progress jobs + " JOB_STATUS = 'IN_PROGRESS'" + + " and TAKEN_BY is null" + + " and DELETED_AT is null" + + // give some breath, don't select jos that were recently reached + " and MODIFIED_DATE <= '" + nowMinusInterval() + + // take the oldest handled one + "' order by MODIFIED_DATE ASC" + + // select only one result + " limit 1"; + + case PENDING: + return "" + + // select only pending jobs + "select vid_job.* from VID_JOB " + + // select users have in_progress jobs + "left join \n" + + " (select user_Id, 1 as has_any_in_progress_job from VID_JOB where JOB_STATUS = 'IN_PROGRESS' or TAKEN_BY IS NOT NULL \n" + + "group by user_id) users_have_any_in_progress_job_tbl\n" + + "on vid_job.user_id = users_have_any_in_progress_job_tbl.user_id " + + "where JOB_STATUS = 'PENDING' and TAKEN_BY is null" + + // job is not deleted + " AND DELETED_AT is null and (\n" + + // limit in-progress to some amount + "select sum(CASE WHEN JOB_STATUS='IN_PROGRESS' or (JOB_STATUS='PENDING' and TAKEN_BY IS NOT NULL) THEN 1 ELSE 0 END) as in_progress\n" + + "from VID_JOB ) <" + maxOpenedInstantiationRequestsToMso + " \n " + + // don't take jobs from templates that already in-progress/failed + "and TEMPLATE_Id not in \n" + + "(select TEMPLATE_Id from vid_job where" + + " (JOB_STATUS='FAILED' and DELETED_AT is null)" + // failed but not deleted + " or JOB_STATUS='IN_PROGRESS'" + + " or TAKEN_BY IS NOT NULL)" + " \n " + + // prefer older jobs, but the earlier in each bulk + "order by has_any_in_progress_job, CREATED_DATE, INDEX_IN_BULK " + + // select only one result + "limit 1"; + default: + throw new GenericUncheckedException("Unsupported topic to pull from: " + topic); + } + } + + + private byte[] getUuidAsByteArray(UUID owner) { + ByteBuffer bb = ByteBuffer.wrap(new byte[16]); + bb.putLong(owner.getMostSignificantBits()); + bb.putLong(owner.getLeastSignificantBits()); + return bb.array(); + } + + @Override + public void pushBack(Job job) { + final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, job.getUuid(), null); + + if (remoteDaoJob == null) { + throw new IllegalStateException("Can push back only pulled jobs. Add new jobs using add()"); + } + + if (remoteDaoJob.getTakenBy() == null) { + throw new IllegalStateException("Can push back only pulled jobs. This one already pushed back."); + } + + final JobDaoImpl jobDao = castToJobDaoImpl(job); + + jobDao.setTakenBy(null); + + Integer age = jobDao.getAge(); + jobDao.setAge(age + 1); + + logger.debug(EELFLoggerDelegate.debugLogger, "{}/{}", jobDao.getStatus(), jobDao.getType()); + + dataAccessService.saveDomainObject(jobDao, DaoUtils.getPropsMap()); + } + + private JobDaoImpl castToJobDaoImpl(Job job) { + if (!(job instanceof JobDaoImpl)) { + throw new UnsupportedOperationException("Can't add " + job.getClass() + " to " + this.getClass()); + } + return (JobDaoImpl) job; + } + + @Override + public Collection<Job> peek() { + return dataAccessService.getList(JobDaoImpl.class, null); + } + + @Override + public Job peek(UUID jobId) { + return (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null); + } + + @Override + public void delete(UUID jobId) { + int updatedEntities; + Date now = new Date(); + + String hqlUpdate = "update JobDaoImpl job set job.deletedAt = :now where " + + " job.id = :id" + + " and job.status in(:pending, :stopped)" + + " and takenBy is null"; + + updatedEntities = DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> + session.createQuery(hqlUpdate) + .setTimestamp("now", now) + .setText("id", jobId.toString()) + .setText("pending", Job.JobStatus.PENDING.toString()) + .setText("stopped", Job.JobStatus.STOPPED.toString()) + .executeUpdate()); + + if (updatedEntities == 0) { + final JobDaoImpl remoteDaoJob = (JobDaoImpl) dataAccessService.getDomainObject(JobDaoImpl.class, jobId, null); + + if (remoteDaoJob == null || remoteDaoJob.getUuid() == null) { + logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service does not exist", jobId); + throw new OperationNotAllowedException("Service does not exist"); + } + + if (!remoteDaoJob.getStatus().equals(Job.JobStatus.PENDING) && !remoteDaoJob.getStatus().equals(Job.JobStatus.STOPPED) || !StringUtils.isEmpty(remoteDaoJob.getTakenBy()) ) { + logger.debug(EELFLoggerDelegate.debugLogger,"jobId {}: Service status does not allow deletion from the queue, status = {}", jobId, remoteDaoJob.getStatus() + + ", takenBy " + remoteDaoJob.getTakenBy()); + throw new OperationNotAllowedException("Service status does not allow deletion from the queue"); + } + + throw new OperationNotAllowedException("Service deletion failed"); + } + } +} |