aboutsummaryrefslogtreecommitdiffstats
path: root/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java')
-rw-r--r--vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java1287
1 files changed, 673 insertions, 614 deletions
diff --git a/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java b/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java
index ff4b34f5f..3686dc26e 100644
--- a/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java
+++ b/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java
@@ -1,616 +1,675 @@
package org.onap.vid.services;
-//
-//import com.google.common.collect.ImmutableList;
-//import com.google.common.collect.ImmutableMap;
-//import org.apache.commons.lang.RandomStringUtils;
-//import org.apache.commons.lang3.RandomUtils;
-//import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-//import org.apache.commons.lang3.builder.ToStringStyle;
-//import org.hibernate.SessionFactory;
-//import org.onap.vid.exceptions.GenericUncheckedException;
-//import org.onap.vid.exceptions.OperationNotAllowedException;
-//import org.onap.vid.job.Job;
-//import org.onap.vid.job.JobAdapter;
-//import org.onap.vid.job.JobType;
-//import org.onap.vid.job.JobsBrokerService;
-//import org.onap.vid.job.impl.JobDaoImpl;
-//import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
-//import org.onap.vid.utils.DaoUtils;
-//import org.onap.vid.config.DataSourceConfig;
-//import org.onap.vid.config.JobAdapterConfig;
-//import org.onap.portalsdk.core.domain.support.DomainVo;
-//import org.onap.portalsdk.core.service.DataAccessService;
-//import org.onap.portalsdk.core.util.SystemProperties;
-//import org.springframework.test.context.ContextConfiguration;
-//import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-//import org.testng.Assert;
-//import org.testng.annotations.AfterMethod;
-//import org.testng.annotations.BeforeMethod;
-//import org.testng.annotations.DataProvider;
-//import org.testng.annotations.Test;
-//
-//import javax.inject.Inject;
-//import java.lang.reflect.Method;
-//import java.time.LocalDateTime;
-//import java.time.ZoneId;
-//import java.util.*;
-//import java.util.concurrent.*;
-//import java.util.stream.Collectors;
-//import java.util.stream.IntStream;
-//import java.util.stream.Stream;
-//
-//import static java.util.concurrent.TimeUnit.MILLISECONDS;
-//import static org.hamcrest.CoreMatchers.equalTo;
-//import static org.hamcrest.CoreMatchers.is;
-//import static org.hamcrest.MatcherAssert.assertThat;
-//import static org.hamcrest.Matchers.both;
-//import static org.hamcrest.Matchers.containsInAnyOrder;
-//import static org.onap.vid.job.Job.JobStatus.*;
-//import static org.onap.vid.utils.Streams.not;
-//import static org.testng.Assert.assertNotNull;
-//import static org.testng.AssertJUnit.assertEquals;
-//
-//@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
-//public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
-//
-// private static final int JOBS_COUNT = 127;
-// private static final boolean DELETED = true;
-// private final ExecutorService executor = Executors.newFixedThreadPool(90);
-//
-// private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
-//
-// private final long FEW = 500;
-//
-// private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
-// private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
-// private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
-// private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
-// private JobsBrokerService broker;
-//
-// @Inject
-// JobAdapter jobAdapter;
-// @Inject
-// private DataAccessService dataAccessService;
-// @Inject
-// private SessionFactory sessionFactory;
-//
-// /*
-// - pulling jobs is limited to inserted ones
-// - putting back allows getting the job again
-// - multi threads safety
-// - any added job should be visible with view
-//
-// - edges:
-// - pulling with empty repo should return empty optional
-// - pulling more than expected should return empty optional
-// - putting one, over-pulling from a different thread
-// - take before inserting, then insert while waiting
-//
-// */
-//
-// private class NoJobException extends RuntimeException {
-// }
-//
-// private Future<Job> newJobAsync(JobsBrokerService b) {
-// return newJobAsync(b, createMockJob("user id"));
-// }
-//
-// private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
-// return newJobAsync(b, createMockJob("user id", status));
-// }
-//
-// private Job createMockJob(String userId) {
-// return jobAdapter.createJob(
-// JobType.NoOp,
-// new JobAdapter.AsyncJobRequest() {
-// public int nothing = 42;
-// },
-// UUID.randomUUID(),
-// userId,
-// RandomUtils.nextInt());
-// }
-//
-// private Job createMockJob(String userId, Job.JobStatus jobStatus) {
-// Job job = createMockJob(userId);
-// job.setStatus(jobStatus);
-// return job;
-// }
-//
-// private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
-// final Future<Job> jobFuture = executor.submit(() -> {
-// accountThreadId();
-//
-// b.add(job);
-//
-// return job;
-// });
-// return jobFuture;
-// }
-//
-// private void pushBackJobAsync(JobsBrokerService b, Job job) {
-// executor.submit(() -> {
-// accountThreadId();
-// b.pushBack(job);
-// return job;
-// });
-// }
-//
-// private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
-// final Future<Optional<Job>> job = executor.submit(() -> {
-// accountThreadId();
-// // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
-// return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
-// });
-// return job;
-// }
-//
-// private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
-// try {
-// return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
-// } catch (TimeoutException | InterruptedException | ExecutionException e) {
-// throw new RuntimeException(e);
-// }
-// }
-//
-// private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
-// try {
-// return retrievedJobFuture.get(FEW, MILLISECONDS);
-// } catch (TimeoutException | InterruptedException | ExecutionException e) {
-// throw new RuntimeException(e);
-// }
-// }
-//
-// private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
-// final List<Job> originalJobs = putALotOfJobs(broker);
-// final List<Job> retrievedJobs = getAlotOfJobs(broker);
-//
-// assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
-//
-// return retrievedJobs;
-// }
-//
-// private List<Job> putALotOfJobs(JobsBrokerService broker) {
-// int n = JOBS_COUNT;
-// return IntStream.range(0, n)
-// .mapToObj(i -> newJobAsync(broker))
-// .map(this::waitForFutureJob)
-// .collect(Collectors.toList());
-// }
-//
-// private List<Job> getAlotOfJobs(JobsBrokerService broker) {
-// int n = JOBS_COUNT;
-// return IntStream.range(0, n)
-// .mapToObj(i -> pullJobAsync(broker))
-// .map(this::waitForFutureOptionalJob)
-// .collect(Collectors.toList());
-// }
-//
-// private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
-// jobs.forEach(job -> pushBackJobAsync(broker, job));
-// }
-//
-// private void accountThreadId() {
-// threadsIds.add(Thread.currentThread().getId());
-// }
-//
-// @AfterMethod
-// public void threadsCounter() {
-// System.out.println("participating threads count: " + threadsIds.size());
-// threadsIds.clear();
-// }
-//
-// @BeforeMethod
-// public void initializeBroker() {
-// broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
-// ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
-// }
-//
-// @Test
-// public void givenSingleJob_getIt_verifySameJob() {
-// final Job originalJob = waitForFutureJob(newJobAsync(broker));
-//
-// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
-// assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
-// }
-//
-// @Test
-// public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
-// final List<Job> originalJobs = putALotOfJobs(broker);
-//
-// MILLISECONDS.sleep(FEW);
-// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
-//
-// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
-//
-// MILLISECONDS.sleep(FEW);
-// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
-//
-// pushBackJobAsync(broker, retrievedJob);
-//
-// MILLISECONDS.sleep(FEW);
-// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
-// }
-//
-// @Test
-// public void givenManyJobs_getThemAll_verifySameJobs() {
-// putAndGetALotOfJobs(broker);
-// }
-//
-// @Test
-// public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
-// final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
-//
-// pushBackJobs(retrievedJobs1, broker);
-// final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
-//
-// assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
-// }
-//
-// private static Date toDate(LocalDateTime localDateTime) {
-// return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
-// }
-//
-// private void setModifiedDateToJob(UUID jobUuid, Date date) {
-// DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
-// job.setModified(date);
-// DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
-// session.saveOrUpdate(job);
-// return 1;
-// });
-// }
-//
-//
-// public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
-// return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
-// }
-//
-// public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
-// JobDaoImpl job = new JobDaoImpl();
-// job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
-// job.setIndexInBulk(indexInBulk);
-// job.setTemplateId(templateId);
-// job.setType(JobType.NoOp);
-// job.setStatus(status);
-// job.setTakenBy(takenBy);
-// job.setCreated(toDate(date));
-// job.setModified(toDate(date));
-// job.setUserId(userId);
-// if (deleted) {
-// job.setDeletedAt(new Date());
-// }
-// return job;
-// }
-//
-// @DataProvider
-// public static Object[][] jobs(Method test) {
-// LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
-// UUID sameTemplate = UUID.randomUUID();
-// return new Object[][]{
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
-// createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
-// createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
-// 4,
-// 0,
-// PENDING,
-// "Broker should pull the first pending job by oldest date then by job index"
-// },
-// { ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
-// createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
-// createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
-// createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
-// 6,
-// 5,
-// PENDING,
-// "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 2,
-// -1,
-// PENDING,
-// "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 2,
-// -1,
-// PENDING,
-// "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// 2,
-// PENDING,
-// "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
-// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// -1,
-// PENDING,
-// "Broker should not pull any job when there is another job from this template that was taken"
-// },
-// {ImmutableList.of(
-// createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
-// createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
-// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// -1,
-// PENDING,
-// "Broker should not pull any job when there is another job from this template that in progress"
-// },
-// {ImmutableList.of(
-// createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
-// createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
-// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// -1,
-// PENDING,
-// "Broker should not pull any job when there is another job from this template that was failed"
-// },
-// {ImmutableList.of(
-// createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
-// createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
-// createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
-// 3,
-// 2,
-// PENDING,
-// "Broker should pull pending job when there is another job from this template that was deleted, although failed"
-// },
-// { ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// 2,
-// PENDING,
-// "Broker should prioritize jobs of user that has no in-progress jobs"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
-// 3,
-// 2,
-// PENDING,
-// "Broker should prioritize jobs of user that has no taken jobs"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
-// createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
-// createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
-// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
-// 5,
-// 4,
-// PENDING,
-// "Broker should take oldest job when there is one in-progress job to each user"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
-// createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
-// createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
-// 2,
-// -1,
-// PENDING,
-// "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
-// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
-// createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
-// 20,
-// 1,
-// IN_PROGRESS,
-// "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
-// createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
-// createNewJob(13, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
-// createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate, DELETED),createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
-// createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS,null, LocalDateTime.now().minusHours(2))),
-// 20,
-// 5,
-// IN_PROGRESS,
-// "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore deleted,completed, failed, pending and stopped statuses"
-// },
-// {ImmutableList.of(
-// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
-// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
-// createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
-// 20,
-// -1,
-// IN_PROGRESS,
-// "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
-// }
-//
-// };
-// }
-//
-//
-// @Test(dataProvider = "jobs")
-// public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<JobDaoImpl> jobs, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
-// JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
-// for (JobDaoImpl job : jobs) {
-// Date modifiedDate = job.getModified();
-// broker.add(job);
-// setModifiedDateToJob(job.getUuid(), modifiedDate);
-// }
-// Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
-// boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
-// Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason);
-// if (shouldAnyBeSelected) {
-// Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
-// }
-// }
-//
-// @DataProvider
-// public Object[][] topics() {
-// return Arrays.stream(Job.JobStatus.values())
-// .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS).contains(t)))
-// .map(v -> new Object[]{v}).collect(Collectors.toList()).toArray(new Object[][]{});
-// }
-//
-// @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
-// public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
-// broker.pull(topic, UUID.randomUUID().toString());
-// }
-//
-// @Test(expectedExceptions = NoJobException.class)
-// public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
-// Stream.of(Job.JobStatus.values())
-// .filter(not(s -> s.equals(PENDING)))
-// .map(s -> createMockJob("some user id", s))
-// .map(job -> newJobAsync(broker, job))
-// .map(this::waitForFutureJob)
-// .collect(Collectors.toList());
-//
-// waitForFutureOptionalJob(pullJobAsync(broker));
-// }
-//
-// @Test
-// public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
-// newJobAsync(broker); // this negated the expected result of the call below
-// givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
-// }
-//
-// @Test(expectedExceptions = NoJobException.class)
-// public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
-// putAndGetALotOfJobs(broker);
-// waitForFutureOptionalJob(pullJobAsync(broker));
-// }
-//
-// @Test(expectedExceptions = NoJobException.class)
-// public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
-// final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
-// assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
-// waitForFutureOptionalJob(futureOptionalJob);
-// }
-//
-// @Test(expectedExceptions = IllegalStateException.class)
-// public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
-// waitForFutureJob(newJobAsync(broker));
-// waitForFutureJob(newJobAsync(broker));
-// waitForFutureOptionalJob(pullJobAsync(broker));
-//
-// Job myJob = createMockJob("user id");
-// myJob.setUuid(UUID.randomUUID());
-//
-// broker.pushBack(myJob); //Should fail
-// }
-//
-// @Test
-// public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
-// final ImmutableMap<String, Object> randomDataForMostRecentJobType =
-// ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
-//
-// waitForFutureJob(newJobAsync(broker));
-// final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
-//
-// job.setStatus(Job.JobStatus.PENDING);
-// job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
-// job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
-// job.setTypeAndData(JobType.ServiceInstantiation, randomDataForMostRecentJobType);
-//
-// broker.pushBack(job);
-// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
-//
-// assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
-// assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
-// assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
-// }
-//
-// private static String jobDataReflected(Job job) {
-// return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
-// .setExcludeFieldNames("created", "modified", "takenBy")
-// .toString();
-// }
-//
-// @Test(expectedExceptions = IllegalStateException.class)
-// public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
-// waitForFutureJob(newJobAsync(broker));
-// final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
-//
-// broker.pushBack(job);
-// broker.pushBack(job); //Should fail
-// }
-//
-// @Test
-// public void addJob_PeekItById_verifySameJobWasPeeked() {
-// String userId = UUID.randomUUID().toString();
-// Job myJob = createMockJob(userId);
-// UUID uuid = broker.add(myJob);
-// Job peekedJob = broker.peek(uuid);
-// assertEquals("added testId is not the same as peeked TestsId",
-// userId,
-// peekedJob.getData().get("userId"));
-// }
-//
-// @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
-// public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
-// final Job job = waitForFutureJob(newJobAsync(broker, status));
-// broker.delete(job.getUuid());
-// assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
-// waitForFutureOptionalJob(pullJobAsync(broker));
-// }
-//
-// @DataProvider
-// public static Object[][] jobStatusesForSuccessDelete() {
-// return new Object[][]{
-// {PENDING},
-// {STOPPED}
-// };
-// }
-//
-// @Test(
-// dataProvider = "jobStatusesForFailedDelete",
-// expectedExceptions = OperationNotAllowedException.class,
-// expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
-// )
-// public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
-// final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
-//
-// if (taken) {
-// waitForFutureOptionalJob(pullJobAsync(broker));
-// }
-//
-//
-// broker.delete(job.getUuid());
-// }
-//
-// @DataProvider
-// public static Object[][] jobStatusesForFailedDelete() {
-// return new Object[][]{
-// {PENDING, true},
-// {IN_PROGRESS, false},
-// {COMPLETED, false},
-// {PAUSE, false},
-// {FAILED, false},
-// };
-// }
-//
-// @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
-// public void deleteJob_notExist_exceptionIsThrown() {
-// waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
-// broker.delete(new UUID(111, 111));
-// }
-//
-//}
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.hibernate.SessionFactory;
+import org.onap.portalsdk.core.domain.support.DomainVo;
+import org.onap.portalsdk.core.service.DataAccessService;
+import org.onap.portalsdk.core.util.SystemProperties;
+import org.onap.vid.config.DataSourceConfig;
+import org.onap.vid.config.JobAdapterConfig;
+import org.onap.vid.exceptions.GenericUncheckedException;
+import org.onap.vid.exceptions.OperationNotAllowedException;
+import org.onap.vid.job.Job;
+import org.onap.vid.job.JobAdapter;
+import org.onap.vid.job.JobType;
+import org.onap.vid.job.JobsBrokerService;
+import org.onap.vid.job.command.JobCommandFactoryTest;
+import org.onap.vid.job.impl.JobDaoImpl;
+import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
+import org.onap.vid.utils.DaoUtils;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.lang.reflect.Method;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.onap.vid.job.Job.JobStatus.*;
+import static org.onap.vid.utils.Streams.not;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+
+@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
+public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
+
+ private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
+
+ private static final int JOBS_COUNT = 127;
+ private static final boolean DELETED = true;
+ private final ExecutorService executor = Executors.newFixedThreadPool(90);
+
+ private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
+
+ private final long FEW = 500;
+
+ private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
+ private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
+ private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
+ private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
+ private JobsBrokerService broker;
+
+ @Inject
+ JobAdapter jobAdapter;
+ @Inject
+ private DataAccessService dataAccessService;
+ @Inject
+ private SessionFactory sessionFactory;
+
+ /*
+ - pulling jobs is limited to inserted ones
+ - putting back allows getting the job again
+ - multi threads safety
+ - any added job should be visible with view
+
+ - edges:
+ - pulling with empty repo should return empty optional
+ - pulling more than expected should return empty optional
+ - putting one, over-pulling from a different thread
+ - take before inserting, then insert while waiting
+
+ */
+
+ private class NoJobException extends RuntimeException {
+ }
+
+ private Future<Job> newJobAsync(JobsBrokerService b) {
+ return newJobAsync(b, createMockJob("user id"));
+ }
+
+ private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
+ return newJobAsync(b, createMockJob("user id", status));
+ }
+
+ private Job createMockJob(String userId) {
+ return jobAdapter.createServiceInstantiationJob(
+ JobType.NoOp,
+ new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
+ UUID.randomUUID(),
+ userId,
+ "optimisticUniqueServiceInstanceName",
+ RandomUtils.nextInt());
+ }
+
+ private Job createMockJob(String userId, Job.JobStatus jobStatus) {
+ Job job = createMockJob(userId);
+ job.setStatus(jobStatus);
+ return job;
+ }
+
+ private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
+ final Future<Job> jobFuture = executor.submit(() -> {
+ accountThreadId();
+
+ b.add(job);
+
+ return job;
+ });
+ return jobFuture;
+ }
+
+ private void pushBackJobAsync(JobsBrokerService b, Job job) {
+ executor.submit(() -> {
+ accountThreadId();
+ b.pushBack(job);
+ return job;
+ });
+ }
+
+ private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
+ final Future<Optional<Job>> job = executor.submit(() -> {
+ accountThreadId();
+ // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
+ return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
+ });
+ return job;
+ }
+
+ private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
+ try {
+ return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
+ try {
+ return retrievedJobFuture.get(FEW, MILLISECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
+ final List<Job> originalJobs = putALotOfJobs(broker);
+ final List<Job> retrievedJobs = getAlotOfJobs(broker);
+
+ assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
+
+ return retrievedJobs;
+ }
+
+ private List<Job> putALotOfJobs(JobsBrokerService broker) {
+ int n = JOBS_COUNT;
+ return IntStream.range(0, n)
+ .mapToObj(i -> newJobAsync(broker))
+ .map(this::waitForFutureJob)
+ .collect(toList());
+ }
+
+ private List<Job> getAlotOfJobs(JobsBrokerService broker) {
+ int n = JOBS_COUNT;
+ return IntStream.range(0, n)
+ .mapToObj(i -> pullJobAsync(broker))
+ .map(this::waitForFutureOptionalJob)
+ .collect(toList());
+ }
+
+ private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
+ jobs.forEach(job -> pushBackJobAsync(broker, job));
+ }
+
+ private void accountThreadId() {
+ threadsIds.add(Thread.currentThread().getId());
+ }
+
+ @AfterMethod
+ public void threadsCounter() {
+ logger.info("participating threads count: " + threadsIds.size());
+ threadsIds.clear();
+ }
+
+ @BeforeMethod
+ public void initializeBroker() {
+ broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
+ ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
+ }
+
+ @Test(enabled = false)
+ public void givenSingleJob_getIt_verifySameJob() {
+ final Job originalJob = waitForFutureJob(newJobAsync(broker));
+
+ final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
+ assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
+ }
+
+ @Test(enabled = false)
+ public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
+ final List<Job> originalJobs = putALotOfJobs(broker);
+
+ MILLISECONDS.sleep(FEW);
+ assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
+
+ final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
+
+ MILLISECONDS.sleep(FEW);
+ assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
+
+ pushBackJobAsync(broker, retrievedJob);
+
+ MILLISECONDS.sleep(FEW);
+ assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
+ }
+
+ @Test(enabled = false)
+ public void givenManyJobs_getThemAll_verifySameJobs() {
+ putAndGetALotOfJobs(broker);
+ }
+
+ @Test(enabled = false)
+ public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
+ final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
+
+ pushBackJobs(retrievedJobs1, broker);
+ final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
+
+ assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
+ }
+
+ private static Date toDate(LocalDateTime localDateTime) {
+ return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
+ }
+
+ private void setModifiedDateToJob(UUID jobUuid, Date date) {
+ DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
+ job.setModified(date);
+ DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
+ session.saveOrUpdate(job);
+ return 1;
+ });
+ }
+
+
+ public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
+ return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
+ }
+
+ public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
+ JobDaoImpl job = new JobDaoImpl();
+ job.setUuid(UUID.randomUUID());
+ job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
+ job.setIndexInBulk(indexInBulk);
+ job.setTemplateId(templateId);
+ job.setType(JobType.NoOp);
+ job.setStatus(status);
+ job.setTakenBy(takenBy);
+ job.setCreated(toDate(date));
+ job.setModified(toDate(date));
+ job.setUserId(userId);
+ if (deleted) {
+ job.setDeletedAt(new Date());
+ }
+ return job;
+ }
+
+ @DataProvider
+ public static Object[][] jobs(Method test) {
+ LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
+ UUID sameTemplate = UUID.randomUUID();
+ return new Object[][]{
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
+ () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
+ 4,
+ 0,
+ PENDING,
+ "Broker should pull the first pending job by oldest date then by job index"
+ },
+ { ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
+ () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
+ () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
+ () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
+ () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
+ 6,
+ 5,
+ PENDING,
+ "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 2,
+ -1,
+ PENDING,
+ "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 2,
+ -1,
+ PENDING,
+ "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
+ ),
+ 3,
+ 2,
+ PENDING,
+ "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
+ () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 3,
+ -1,
+ PENDING,
+ "Broker should not pull any job when there is another job from this template that was taken"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
+ () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 3,
+ -1,
+ PENDING,
+ "Broker should not pull any job when there is another job from this template that in progress"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
+ () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
+ () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 3,
+ -1,
+ PENDING,
+ "Broker should not pull any job when there is another job from this template that was failed"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
+ () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
+ () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
+ 3,
+ 2,
+ PENDING,
+ "Broker should pull pending job when there is another job from this template that was deleted, although failed"
+ },
+ { ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 3,
+ 2,
+ PENDING,
+ "Broker should prioritize jobs of user that has no in-progress jobs"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
+ 3,
+ 2,
+ PENDING,
+ "Broker should prioritize jobs of user that has no taken jobs"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
+ 5,
+ 4,
+ PENDING,
+ "Broker should take oldest job when there is one in-progress job to each user"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
+ 2,
+ -1,
+ PENDING,
+ "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
+ 20,
+ 1,
+ IN_PROGRESS,
+ "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
+ () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
+ () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
+ () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
+ () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
+ () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
+ ),
+ 20,
+ 6,
+ IN_PROGRESS,
+ "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
+ () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
+ () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
+ () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
+ () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
+ () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
+ () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
+ ),
+ 20,
+ 6,
+ RESOURCE_IN_PROGRESS,
+ "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
+ 20,
+ -1,
+ IN_PROGRESS,
+ "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
+ 20,
+ -1,
+ RESOURCE_IN_PROGRESS,
+ "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
+ () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
+ () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
+ 1,
+ 2,
+ CREATING,
+ "Broker with creating topic should pull oldest creating job and ignore mso limit"
+ },
+ {ImmutableList.of(
+ (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
+ 1,
+ 0,
+ CREATING,
+ "Broker with CREATING topic should pull CREATING job that was just modified"
+ }
+
+ };
+ }
+
+ public interface Jobber {
+ // Will defer LocalDateTime.now() to test's "real-time"
+ JobDaoImpl toJob();
+ }
+
+ @Test(enabled = false, dataProvider = "jobs")
+ public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
+ JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
+ final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
+ for (JobDaoImpl job : jobs) {
+ Date modifiedDate = job.getModified();
+ broker.add(job);
+ setModifiedDateToJob(job.getUuid(), modifiedDate);
+ }
+ Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
+ boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
+ String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
+ Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
+ if (shouldAnyBeSelected) {
+ Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
+ }
+ }
+
+ @DataProvider
+ public Object[][] topics() {
+ return Arrays.stream(Job.JobStatus.values())
+ .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
+ .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
+ }
+
+ @Test(enabled = false, dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
+ public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
+ broker.pull(topic, UUID.randomUUID().toString());
+ }
+
+ @Test(enabled = false, expectedExceptions = NoJobException.class)
+ public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
+ Stream.of(Job.JobStatus.values())
+ .filter(not(s -> s.equals(PENDING)))
+ .map(s -> createMockJob("some user id", s))
+ .map(job -> newJobAsync(broker, job))
+ .map(this::waitForFutureJob)
+ .collect(toList());
+
+ waitForFutureOptionalJob(pullJobAsync(broker));
+ }
+
+ @Test(enabled = false)
+ public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
+ newJobAsync(broker); // this negated the expected result of the call below
+ givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
+ }
+
+ @Test(enabled = false, expectedExceptions = NoJobException.class)
+ public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
+ putAndGetALotOfJobs(broker);
+ waitForFutureOptionalJob(pullJobAsync(broker));
+ }
+
+ @Test(enabled = false, expectedExceptions = NoJobException.class)
+ public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
+ final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
+ assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
+ waitForFutureOptionalJob(futureOptionalJob);
+ }
+
+ @Test(enabled = false, expectedExceptions = IllegalStateException.class)
+ public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
+ waitForFutureJob(newJobAsync(broker));
+ waitForFutureJob(newJobAsync(broker));
+ waitForFutureOptionalJob(pullJobAsync(broker));
+
+ Job myJob = createMockJob("user id");
+ myJob.setUuid(UUID.randomUUID());
+
+ broker.pushBack(myJob); //Should fail
+ }
+
+ @Test(enabled = false)
+ public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
+ final ImmutableMap<String, Object> randomDataForMostRecentJobType =
+ ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
+
+ waitForFutureJob(newJobAsync(broker));
+ final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
+
+ job.setStatus(Job.JobStatus.PENDING);
+ job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
+ job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
+ job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
+
+ broker.pushBack(job);
+ final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
+
+ assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
+ assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
+ assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
+ }
+
+ private static String jobDataReflected(Job job) {
+ return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
+ .setExcludeFieldNames("created", "modified", "takenBy")
+ .toString();
+ }
+
+ @Test(enabled = false, expectedExceptions = IllegalStateException.class)
+ public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
+ waitForFutureJob(newJobAsync(broker));
+ final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
+
+ broker.pushBack(job);
+ broker.pushBack(job); //Should fail
+ }
+
+ @Test(enabled = false)
+ public void addJob_PeekItById_verifySameJobWasPeeked() {
+ String userId = UUID.randomUUID().toString();
+ Job myJob = createMockJob(userId);
+ UUID uuid = broker.add(myJob);
+ Job peekedJob = broker.peek(uuid);
+ assertEquals("added testId is not the same as peeked TestsId",
+ userId,
+ peekedJob.getSharedData().getUserId());
+ }
+
+ @Test(enabled = false, dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
+ public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
+ final Job job = waitForFutureJob(newJobAsync(broker, status));
+ broker.delete(job.getUuid());
+ assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
+ waitForFutureOptionalJob(pullJobAsync(broker));
+ }
+
+ @DataProvider
+ public static Object[][] jobStatusesForSuccessDelete() {
+ return new Object[][]{
+ {PENDING},
+ {STOPPED}
+ };
+ }
+
+ @Test(enabled = false,
+ dataProvider = "jobStatusesForFailedDelete",
+ expectedExceptions = OperationNotAllowedException.class,
+ expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
+ )
+ public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
+ final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
+
+ if (taken) {
+ waitForFutureOptionalJob(pullJobAsync(broker));
+ }
+
+
+ broker.delete(job.getUuid());
+ }
+
+ @DataProvider
+ public static Object[][] jobStatusesForFailedDelete() {
+ return new Object[][]{
+ {PENDING, true},
+ {IN_PROGRESS, false},
+ {COMPLETED, false},
+ {PAUSE, false},
+ {FAILED, false},
+ };
+ }
+
+ @Test(enabled = false, expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
+ public void deleteJob_notExist_exceptionIsThrown() {
+ waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
+ broker.delete(new UUID(111, 111));
+ }
+
+}