From 6ad41e3ccd398a2721f41ad61c80b7bb03f7d127 Mon Sep 17 00:00:00 2001 From: Ittay Stern Date: Mon, 31 Dec 2018 17:21:27 +0200 Subject: Merge from ECOMP's repository Main Features -------------- - Async-Instantiation jobs mechanism major update; still WIP (package `org.onap.vid.job`) - New features in View/Edit: Activate fabric configuration; show related networks; soft delete - Support AAI service-tree traversal (`AAIServiceTree`) - In-memory cache for SDC models and certain A&AI queries (`CacheProviderWithLoadingCache`) - Upgrade TOSCA Parser and add parsing options; fix malformed TOSCA models - Resolve Cloud-Owner values for MSO - Pass X-ONAP headers to MSO Infrastructure -------------- - Remove codehaus' jackson mapper; use soley fasterxml 2.9.7 - Surefire invokes both TestNG and JUnit tests - Support Kotlin source files - AaiController2 which handles errors in a "Spring manner" - Inline generated-sources and remove jsonschema2pojo Quality -------- - Cumulative bug fixes (A&AI API, UI timeouts, and many more) - Many Sonar issues cleaned-up - Some unused classes removed - Minor changes in vid-automation project, allowing some API verification to run Hard Merges ------------ - HTTP Clients (MSO, A&AI, WebConfig, OutgoingRequestHeadersTest) - Moved `package org.onap.vid.controllers` to `controller`, without plural -- just to keep semantic sync with ECOMP. Reference commit in ECOMP: 3d1141625 Issue-ID: VID-378 Change-Id: I9c8d1e74caa41815891d441fc0760bb5f29c5788 Signed-off-by: Ittay Stern --- .../onap/vid/services/JobsBrokerServiceTest.java | 1287 ++++++++++---------- 1 file changed, 673 insertions(+), 614 deletions(-) (limited to 'vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java') diff --git a/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java b/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java index ff4b34f5f..3686dc26e 100644 --- a/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java +++ b/vid-app-common/src/test/java/org/onap/vid/services/JobsBrokerServiceTest.java @@ -1,616 +1,675 @@ package org.onap.vid.services; -// -//import com.google.common.collect.ImmutableList; -//import com.google.common.collect.ImmutableMap; -//import org.apache.commons.lang.RandomStringUtils; -//import org.apache.commons.lang3.RandomUtils; -//import org.apache.commons.lang3.builder.ReflectionToStringBuilder; -//import org.apache.commons.lang3.builder.ToStringStyle; -//import org.hibernate.SessionFactory; -//import org.onap.vid.exceptions.GenericUncheckedException; -//import org.onap.vid.exceptions.OperationNotAllowedException; -//import org.onap.vid.job.Job; -//import org.onap.vid.job.JobAdapter; -//import org.onap.vid.job.JobType; -//import org.onap.vid.job.JobsBrokerService; -//import org.onap.vid.job.impl.JobDaoImpl; -//import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl; -//import org.onap.vid.utils.DaoUtils; -//import org.onap.vid.config.DataSourceConfig; -//import org.onap.vid.config.JobAdapterConfig; -//import org.onap.portalsdk.core.domain.support.DomainVo; -//import org.onap.portalsdk.core.service.DataAccessService; -//import org.onap.portalsdk.core.util.SystemProperties; -//import org.springframework.test.context.ContextConfiguration; -//import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; -//import org.testng.Assert; -//import org.testng.annotations.AfterMethod; -//import org.testng.annotations.BeforeMethod; -//import org.testng.annotations.DataProvider; -//import org.testng.annotations.Test; -// -//import javax.inject.Inject; -//import java.lang.reflect.Method; -//import java.time.LocalDateTime; -//import java.time.ZoneId; -//import java.util.*; -//import java.util.concurrent.*; -//import java.util.stream.Collectors; -//import java.util.stream.IntStream; -//import java.util.stream.Stream; -// -//import static java.util.concurrent.TimeUnit.MILLISECONDS; -//import static org.hamcrest.CoreMatchers.equalTo; -//import static org.hamcrest.CoreMatchers.is; -//import static org.hamcrest.MatcherAssert.assertThat; -//import static org.hamcrest.Matchers.both; -//import static org.hamcrest.Matchers.containsInAnyOrder; -//import static org.onap.vid.job.Job.JobStatus.*; -//import static org.onap.vid.utils.Streams.not; -//import static org.testng.Assert.assertNotNull; -//import static org.testng.AssertJUnit.assertEquals; -// -//@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class}) -//public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests { -// -// private static final int JOBS_COUNT = 127; -// private static final boolean DELETED = true; -// private final ExecutorService executor = Executors.newFixedThreadPool(90); -// -// private final Set threadsIds = new ConcurrentSkipListSet<>(); -// -// private final long FEW = 500; -// -// private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same"; -// private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same"; -// private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue"; -// private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist"; -// private JobsBrokerService broker; -// -// @Inject -// JobAdapter jobAdapter; -// @Inject -// private DataAccessService dataAccessService; -// @Inject -// private SessionFactory sessionFactory; -// -// /* -// - pulling jobs is limited to inserted ones -// - putting back allows getting the job again -// - multi threads safety -// - any added job should be visible with view -// -// - edges: -// - pulling with empty repo should return empty optional -// - pulling more than expected should return empty optional -// - putting one, over-pulling from a different thread -// - take before inserting, then insert while waiting -// -// */ -// -// private class NoJobException extends RuntimeException { -// } -// -// private Future newJobAsync(JobsBrokerService b) { -// return newJobAsync(b, createMockJob("user id")); -// } -// -// private Future newJobAsync(JobsBrokerService b, Job.JobStatus status) { -// return newJobAsync(b, createMockJob("user id", status)); -// } -// -// private Job createMockJob(String userId) { -// return jobAdapter.createJob( -// JobType.NoOp, -// new JobAdapter.AsyncJobRequest() { -// public int nothing = 42; -// }, -// UUID.randomUUID(), -// userId, -// RandomUtils.nextInt()); -// } -// -// private Job createMockJob(String userId, Job.JobStatus jobStatus) { -// Job job = createMockJob(userId); -// job.setStatus(jobStatus); -// return job; -// } -// -// private Future newJobAsync(JobsBrokerService b, Job job) { -// final Future jobFuture = executor.submit(() -> { -// accountThreadId(); -// -// b.add(job); -// -// return job; -// }); -// return jobFuture; -// } -// -// private void pushBackJobAsync(JobsBrokerService b, Job job) { -// executor.submit(() -> { -// accountThreadId(); -// b.pushBack(job); -// return job; -// }); -// } -// -// private Future> pullJobAsync(JobsBrokerService broker) { -// final Future> job = executor.submit(() -> { -// accountThreadId(); -// // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs -// return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString()); -// }); -// return job; -// } -// -// private Job waitForFutureOptionalJob(Future> retrievedOptionalJobFuture) { -// try { -// return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new); -// } catch (TimeoutException | InterruptedException | ExecutionException e) { -// throw new RuntimeException(e); -// } -// } -// -// private Job waitForFutureJob(Future retrievedJobFuture) { -// try { -// return retrievedJobFuture.get(FEW, MILLISECONDS); -// } catch (TimeoutException | InterruptedException | ExecutionException e) { -// throw new RuntimeException(e); -// } -// } -// -// private List putAndGetALotOfJobs(JobsBrokerService broker) { -// final List originalJobs = putALotOfJobs(broker); -// final List retrievedJobs = getAlotOfJobs(broker); -// -// assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray())); -// -// return retrievedJobs; -// } -// -// private List putALotOfJobs(JobsBrokerService broker) { -// int n = JOBS_COUNT; -// return IntStream.range(0, n) -// .mapToObj(i -> newJobAsync(broker)) -// .map(this::waitForFutureJob) -// .collect(Collectors.toList()); -// } -// -// private List getAlotOfJobs(JobsBrokerService broker) { -// int n = JOBS_COUNT; -// return IntStream.range(0, n) -// .mapToObj(i -> pullJobAsync(broker)) -// .map(this::waitForFutureOptionalJob) -// .collect(Collectors.toList()); -// } -// -// private void pushBackJobs(List jobs, JobsBrokerService broker) { -// jobs.forEach(job -> pushBackJobAsync(broker, job)); -// } -// -// private void accountThreadId() { -// threadsIds.add(Thread.currentThread().getId()); -// } -// -// @AfterMethod -// public void threadsCounter() { -// System.out.println("participating threads count: " + threadsIds.size()); -// threadsIds.clear(); -// } -// -// @BeforeMethod -// public void initializeBroker() { -// broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0); -// ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll(); -// } -// -// @Test -// public void givenSingleJob_getIt_verifySameJob() { -// final Job originalJob = waitForFutureJob(newJobAsync(broker)); -// -// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); -// assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob)); -// } -// -// @Test -// public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException { -// final List originalJobs = putALotOfJobs(broker); -// -// MILLISECONDS.sleep(FEW); -// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); -// -// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); -// -// MILLISECONDS.sleep(FEW); -// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); -// -// pushBackJobAsync(broker, retrievedJob); -// -// MILLISECONDS.sleep(FEW); -// assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); -// } -// -// @Test -// public void givenManyJobs_getThemAll_verifySameJobs() { -// putAndGetALotOfJobs(broker); -// } -// -// @Test -// public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() { -// final List retrievedJobs1 = putAndGetALotOfJobs(broker); -// -// pushBackJobs(retrievedJobs1, broker); -// final List retrievedJobs2 = getAlotOfJobs(broker); -// -// assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray())); -// } -// -// private static Date toDate(LocalDateTime localDateTime) { -// return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); -// } -// -// private void setModifiedDateToJob(UUID jobUuid, Date date) { -// DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap()); -// job.setModified(date); -// DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> { -// session.saveOrUpdate(job); -// return 1; -// }); -// } -// -// -// public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) { -// return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false); -// } -// -// public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){ -// JobDaoImpl job = new JobDaoImpl(); -// job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15))); -// job.setIndexInBulk(indexInBulk); -// job.setTemplateId(templateId); -// job.setType(JobType.NoOp); -// job.setStatus(status); -// job.setTakenBy(takenBy); -// job.setCreated(toDate(date)); -// job.setModified(toDate(date)); -// job.setUserId(userId); -// if (deleted) { -// job.setDeletedAt(new Date()); -// } -// return job; -// } -// -// @DataProvider -// public static Object[][] jobs(Method test) { -// LocalDateTime oldestDate = LocalDateTime.now().minusHours(30); -// UUID sameTemplate = UUID.randomUUID(); -// return new Object[][]{ -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate), -// createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)), -// createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))), -// 4, -// 0, -// PENDING, -// "Broker should pull the first pending job by oldest date then by job index" -// }, -// { ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate), -// createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate), -// createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate), -// createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate), -// createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))), -// 6, -// 5, -// PENDING, -// "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 2, -// -1, -// PENDING, -// "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate), -// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 2, -// -1, -// PENDING, -// "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate), -// createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// 2, -// PENDING, -// "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate), -// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// -1, -// PENDING, -// "Broker should not pull any job when there is another job from this template that was taken" -// }, -// {ImmutableList.of( -// createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate), -// createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate), -// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// -1, -// PENDING, -// "Broker should not pull any job when there is another job from this template that in progress" -// }, -// {ImmutableList.of( -// createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate), -// createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate), -// createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// -1, -// PENDING, -// "Broker should not pull any job when there is another job from this template that was failed" -// }, -// {ImmutableList.of( -// createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED), -// createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate), -// createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))), -// 3, -// 2, -// PENDING, -// "Broker should pull pending job when there is another job from this template that was deleted, although failed" -// }, -// { ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate), -// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate), -// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// 2, -// PENDING, -// "Broker should prioritize jobs of user that has no in-progress jobs" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate), -// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))), -// 3, -// 2, -// PENDING, -// "Broker should prioritize jobs of user that has no taken jobs" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)), -// createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), -// createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), -// createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)), -// 5, -// 4, -// PENDING, -// "Broker should take oldest job when there is one in-progress job to each user" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate), -// createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate), -// createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))), -// 2, -// -1, -// PENDING, -// "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), -// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate), -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), -// createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))), -// 20, -// 1, -// IN_PROGRESS, -// "Broker with in progress topic should pull the first in progress and not taken job by oldest date" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate), -// createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate), -// createNewJob(13, UUID.randomUUID(), "userId", PENDING,null, oldestDate), -// createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate), -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate, DELETED),createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate), -// createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS,null, LocalDateTime.now().minusHours(2))), -// 20, -// 5, -// IN_PROGRESS, -// "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore deleted,completed, failed, pending and stopped statuses" -// }, -// {ImmutableList.of( -// createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()), -// createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)), -// createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))), -// 20, -// -1, -// IN_PROGRESS, -// "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)" -// } -// -// }; -// } -// -// -// @Test(dataProvider = "jobs") -// public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List jobs, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) { -// JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20); -// for (JobDaoImpl job : jobs) { -// Date modifiedDate = job.getModified(); -// broker.add(job); -// setModifiedDateToJob(job.getUuid(), modifiedDate); -// } -// Optional nextJob = broker.pull(topic, UUID.randomUUID().toString()); -// boolean shouldAnyBeSelected = expectedIndexSelected >= 0; -// Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason); -// if (shouldAnyBeSelected) { -// Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason); -// } -// } -// -// @DataProvider -// public Object[][] topics() { -// return Arrays.stream(Job.JobStatus.values()) -// .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS).contains(t))) -// .map(v -> new Object[]{v}).collect(Collectors.toList()).toArray(new Object[][]{}); -// } -// -// @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*") -// public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) { -// broker.pull(topic, UUID.randomUUID().toString()); -// } -// -// @Test(expectedExceptions = NoJobException.class) -// public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() { -// Stream.of(Job.JobStatus.values()) -// .filter(not(s -> s.equals(PENDING))) -// .map(s -> createMockJob("some user id", s)) -// .map(job -> newJobAsync(broker, job)) -// .map(this::waitForFutureJob) -// .collect(Collectors.toList()); -// -// waitForFutureOptionalJob(pullJobAsync(broker)); -// } -// -// @Test -// public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() { -// newJobAsync(broker); // this negated the expected result of the call below -// givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved(); -// } -// -// @Test(expectedExceptions = NoJobException.class) -// public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() { -// putAndGetALotOfJobs(broker); -// waitForFutureOptionalJob(pullJobAsync(broker)); -// } -// -// @Test(expectedExceptions = NoJobException.class) -// public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException { -// final Future> futureOptionalJob = pullJobAsync(broker); -// assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false)); -// waitForFutureOptionalJob(futureOptionalJob); -// } -// -// @Test(expectedExceptions = IllegalStateException.class) -// public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() { -// waitForFutureJob(newJobAsync(broker)); -// waitForFutureJob(newJobAsync(broker)); -// waitForFutureOptionalJob(pullJobAsync(broker)); -// -// Job myJob = createMockJob("user id"); -// myJob.setUuid(UUID.randomUUID()); -// -// broker.pushBack(myJob); //Should fail -// } -// -// @Test -// public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() { -// final ImmutableMap randomDataForMostRecentJobType = -// ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c")); -// -// waitForFutureJob(newJobAsync(broker)); -// final Job job = waitForFutureOptionalJob(pullJobAsync(broker)); -// -// job.setStatus(Job.JobStatus.PENDING); -// job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning")); -// job.setTypeAndData(JobType.HttpCall, ImmutableMap.of()); -// job.setTypeAndData(JobType.ServiceInstantiation, randomDataForMostRecentJobType); -// -// broker.pushBack(job); -// final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); -// -// assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job)); -// assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType))); -// assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job))); -// } -// -// private static String jobDataReflected(Job job) { -// return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE) -// .setExcludeFieldNames("created", "modified", "takenBy") -// .toString(); -// } -// -// @Test(expectedExceptions = IllegalStateException.class) -// public void givenSingleJob_pushBackTwice_verifyPushingRejected() { -// waitForFutureJob(newJobAsync(broker)); -// final Job job = waitForFutureOptionalJob(pullJobAsync(broker)); -// -// broker.pushBack(job); -// broker.pushBack(job); //Should fail -// } -// -// @Test -// public void addJob_PeekItById_verifySameJobWasPeeked() { -// String userId = UUID.randomUUID().toString(); -// Job myJob = createMockJob(userId); -// UUID uuid = broker.add(myJob); -// Job peekedJob = broker.peek(uuid); -// assertEquals("added testId is not the same as peeked TestsId", -// userId, -// peekedJob.getData().get("userId")); -// } -// -// @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class) -// public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) { -// final Job job = waitForFutureJob(newJobAsync(broker, status)); -// broker.delete(job.getUuid()); -// assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted"); -// waitForFutureOptionalJob(pullJobAsync(broker)); -// } -// -// @DataProvider -// public static Object[][] jobStatusesForSuccessDelete() { -// return new Object[][]{ -// {PENDING}, -// {STOPPED} -// }; -// } -// -// @Test( -// dataProvider = "jobStatusesForFailedDelete", -// expectedExceptions = OperationNotAllowedException.class, -// expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE -// ) -// public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) { -// final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status))); -// -// if (taken) { -// waitForFutureOptionalJob(pullJobAsync(broker)); -// } -// -// -// broker.delete(job.getUuid()); -// } -// -// @DataProvider -// public static Object[][] jobStatusesForFailedDelete() { -// return new Object[][]{ -// {PENDING, true}, -// {IN_PROGRESS, false}, -// {COMPLETED, false}, -// {PAUSE, false}, -// {FAILED, false}, -// }; -// } -// -// @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE) -// public void deleteJob_notExist_exceptionIsThrown() { -// waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING))); -// broker.delete(new UUID(111, 111)); -// } -// -//} + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.hibernate.SessionFactory; +import org.onap.portalsdk.core.domain.support.DomainVo; +import org.onap.portalsdk.core.service.DataAccessService; +import org.onap.portalsdk.core.util.SystemProperties; +import org.onap.vid.config.DataSourceConfig; +import org.onap.vid.config.JobAdapterConfig; +import org.onap.vid.exceptions.GenericUncheckedException; +import org.onap.vid.exceptions.OperationNotAllowedException; +import org.onap.vid.job.Job; +import org.onap.vid.job.JobAdapter; +import org.onap.vid.job.JobType; +import org.onap.vid.job.JobsBrokerService; +import org.onap.vid.job.command.JobCommandFactoryTest; +import org.onap.vid.job.impl.JobDaoImpl; +import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl; +import org.onap.vid.utils.DaoUtils; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.lang.reflect.Method; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.onap.vid.job.Job.JobStatus.*; +import static org.onap.vid.utils.Streams.not; +import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.assertEquals; + +@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class}) +public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests { + + private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class); + + private static final int JOBS_COUNT = 127; + private static final boolean DELETED = true; + private final ExecutorService executor = Executors.newFixedThreadPool(90); + + private final Set threadsIds = new ConcurrentSkipListSet<>(); + + private final long FEW = 500; + + private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same"; + private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same"; + private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue"; + private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist"; + private JobsBrokerService broker; + + @Inject + JobAdapter jobAdapter; + @Inject + private DataAccessService dataAccessService; + @Inject + private SessionFactory sessionFactory; + + /* + - pulling jobs is limited to inserted ones + - putting back allows getting the job again + - multi threads safety + - any added job should be visible with view + + - edges: + - pulling with empty repo should return empty optional + - pulling more than expected should return empty optional + - putting one, over-pulling from a different thread + - take before inserting, then insert while waiting + + */ + + private class NoJobException extends RuntimeException { + } + + private Future newJobAsync(JobsBrokerService b) { + return newJobAsync(b, createMockJob("user id")); + } + + private Future newJobAsync(JobsBrokerService b, Job.JobStatus status) { + return newJobAsync(b, createMockJob("user id", status)); + } + + private Job createMockJob(String userId) { + return jobAdapter.createServiceInstantiationJob( + JobType.NoOp, + new JobCommandFactoryTest.MockedRequest(42,"nothing") , + UUID.randomUUID(), + userId, + "optimisticUniqueServiceInstanceName", + RandomUtils.nextInt()); + } + + private Job createMockJob(String userId, Job.JobStatus jobStatus) { + Job job = createMockJob(userId); + job.setStatus(jobStatus); + return job; + } + + private Future newJobAsync(JobsBrokerService b, Job job) { + final Future jobFuture = executor.submit(() -> { + accountThreadId(); + + b.add(job); + + return job; + }); + return jobFuture; + } + + private void pushBackJobAsync(JobsBrokerService b, Job job) { + executor.submit(() -> { + accountThreadId(); + b.pushBack(job); + return job; + }); + } + + private Future> pullJobAsync(JobsBrokerService broker) { + final Future> job = executor.submit(() -> { + accountThreadId(); + // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs + return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString()); + }); + return job; + } + + private Job waitForFutureOptionalJob(Future> retrievedOptionalJobFuture) { + try { + return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Job waitForFutureJob(Future retrievedJobFuture) { + try { + return retrievedJobFuture.get(FEW, MILLISECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private List putAndGetALotOfJobs(JobsBrokerService broker) { + final List originalJobs = putALotOfJobs(broker); + final List retrievedJobs = getAlotOfJobs(broker); + + assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray())); + + return retrievedJobs; + } + + private List putALotOfJobs(JobsBrokerService broker) { + int n = JOBS_COUNT; + return IntStream.range(0, n) + .mapToObj(i -> newJobAsync(broker)) + .map(this::waitForFutureJob) + .collect(toList()); + } + + private List getAlotOfJobs(JobsBrokerService broker) { + int n = JOBS_COUNT; + return IntStream.range(0, n) + .mapToObj(i -> pullJobAsync(broker)) + .map(this::waitForFutureOptionalJob) + .collect(toList()); + } + + private void pushBackJobs(List jobs, JobsBrokerService broker) { + jobs.forEach(job -> pushBackJobAsync(broker, job)); + } + + private void accountThreadId() { + threadsIds.add(Thread.currentThread().getId()); + } + + @AfterMethod + public void threadsCounter() { + logger.info("participating threads count: " + threadsIds.size()); + threadsIds.clear(); + } + + @BeforeMethod + public void initializeBroker() { + broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0); + ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll(); + } + + @Test(enabled = false) + public void givenSingleJob_getIt_verifySameJob() { + final Job originalJob = waitForFutureJob(newJobAsync(broker)); + + final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); + assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob)); + } + + @Test(enabled = false) + public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException { + final List originalJobs = putALotOfJobs(broker); + + MILLISECONDS.sleep(FEW); + assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); + + final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); + + MILLISECONDS.sleep(FEW); + assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); + + pushBackJobAsync(broker, retrievedJob); + + MILLISECONDS.sleep(FEW); + assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray())); + } + + @Test(enabled = false) + public void givenManyJobs_getThemAll_verifySameJobs() { + putAndGetALotOfJobs(broker); + } + + @Test(enabled = false) + public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() { + final List retrievedJobs1 = putAndGetALotOfJobs(broker); + + pushBackJobs(retrievedJobs1, broker); + final List retrievedJobs2 = getAlotOfJobs(broker); + + assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray())); + } + + private static Date toDate(LocalDateTime localDateTime) { + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } + + private void setModifiedDateToJob(UUID jobUuid, Date date) { + DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap()); + job.setModified(date); + DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> { + session.saveOrUpdate(job); + return 1; + }); + } + + + public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) { + return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false); + } + + public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){ + JobDaoImpl job = new JobDaoImpl(); + job.setUuid(UUID.randomUUID()); + job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15))); + job.setIndexInBulk(indexInBulk); + job.setTemplateId(templateId); + job.setType(JobType.NoOp); + job.setStatus(status); + job.setTakenBy(takenBy); + job.setCreated(toDate(date)); + job.setModified(toDate(date)); + job.setUserId(userId); + if (deleted) { + job.setDeletedAt(new Date()); + } + return job; + } + + @DataProvider + public static Object[][] jobs(Method test) { + LocalDateTime oldestDate = LocalDateTime.now().minusHours(30); + UUID sameTemplate = UUID.randomUUID(); + return new Object[][]{ + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate), + () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))), + 4, + 0, + PENDING, + "Broker should pull the first pending job by oldest date then by job index" + }, + { ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate), + () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED), + () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate), + () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate), + () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))), + 6, + 5, + PENDING, + "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))), + 2, + -1, + PENDING, + "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))), + 2, + -1, + PENDING, + "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate) + ), + 3, + 2, + PENDING, + "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate), + () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), + 3, + -1, + PENDING, + "Broker should not pull any job when there is another job from this template that was taken" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate), + () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate), + () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), + 3, + -1, + PENDING, + "Broker should not pull any job when there is another job from this template that in progress" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate), + () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate), + () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))), + 3, + -1, + PENDING, + "Broker should not pull any job when there is another job from this template that was failed" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED), + () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate), + () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))), + 3, + 2, + PENDING, + "Broker should pull pending job when there is another job from this template that was deleted, although failed" + }, + { ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))), + 3, + 2, + PENDING, + "Broker should prioritize jobs of user that has no in-progress jobs" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))), + 3, + 2, + PENDING, + "Broker should prioritize jobs of user that has no taken jobs" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)), + 5, + 4, + PENDING, + "Broker should take oldest job when there is one in-progress job to each user" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate), + () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))), + 2, + -1, + PENDING, + "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate), + () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate), + () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))), + 20, + 1, + IN_PROGRESS, + "Broker with in progress topic should pull the first in progress and not taken job by oldest date" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate), + () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate), + () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate), + () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate), + () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate), + () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED), + () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate) + ), + 20, + 6, + IN_PROGRESS, + "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate), + () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate), + () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate), + () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate), + () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate), + () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED), + () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate), + () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)), + () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate) + ), + 20, + 6, + RESOURCE_IN_PROGRESS, + "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()), + () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)), + () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))), + 20, + -1, + IN_PROGRESS, + "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()), + () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)), + () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))), + 20, + -1, + RESOURCE_IN_PROGRESS, + "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()), + () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)), + () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))), + 1, + 2, + CREATING, + "Broker with creating topic should pull oldest creating job and ignore mso limit" + }, + {ImmutableList.of( + (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())), + 1, + 0, + CREATING, + "Broker with CREATING topic should pull CREATING job that was just modified" + } + + }; + } + + public interface Jobber { + // Will defer LocalDateTime.now() to test's "real-time" + JobDaoImpl toJob(); + } + + @Test(enabled = false, dataProvider = "jobs") + public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) { + JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20); + final List jobs = jobbers.stream().map(Jobber::toJob).collect(toList()); + for (JobDaoImpl job : jobs) { + Date modifiedDate = job.getModified(); + broker.add(job); + setModifiedDateToJob(job.getUuid(), modifiedDate); + } + Optional nextJob = broker.pull(topic, UUID.randomUUID().toString()); + boolean shouldAnyBeSelected = expectedIndexSelected >= 0; + String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled"); + Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc); + if (shouldAnyBeSelected) { + Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason); + } + } + + @DataProvider + public Object[][] topics() { + return Arrays.stream(Job.JobStatus.values()) + .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t))) + .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{}); + } + + @Test(enabled = false, dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*") + public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) { + broker.pull(topic, UUID.randomUUID().toString()); + } + + @Test(enabled = false, expectedExceptions = NoJobException.class) + public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() { + Stream.of(Job.JobStatus.values()) + .filter(not(s -> s.equals(PENDING))) + .map(s -> createMockJob("some user id", s)) + .map(job -> newJobAsync(broker, job)) + .map(this::waitForFutureJob) + .collect(toList()); + + waitForFutureOptionalJob(pullJobAsync(broker)); + } + + @Test(enabled = false) + public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() { + newJobAsync(broker); // this negated the expected result of the call below + givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved(); + } + + @Test(enabled = false, expectedExceptions = NoJobException.class) + public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() { + putAndGetALotOfJobs(broker); + waitForFutureOptionalJob(pullJobAsync(broker)); + } + + @Test(enabled = false, expectedExceptions = NoJobException.class) + public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException { + final Future> futureOptionalJob = pullJobAsync(broker); + assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false)); + waitForFutureOptionalJob(futureOptionalJob); + } + + @Test(enabled = false, expectedExceptions = IllegalStateException.class) + public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() { + waitForFutureJob(newJobAsync(broker)); + waitForFutureJob(newJobAsync(broker)); + waitForFutureOptionalJob(pullJobAsync(broker)); + + Job myJob = createMockJob("user id"); + myJob.setUuid(UUID.randomUUID()); + + broker.pushBack(myJob); //Should fail + } + + @Test(enabled = false) + public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() { + final ImmutableMap randomDataForMostRecentJobType = + ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c")); + + waitForFutureJob(newJobAsync(broker)); + final Job job = waitForFutureOptionalJob(pullJobAsync(broker)); + + job.setStatus(Job.JobStatus.PENDING); + job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning")); + job.setTypeAndData(JobType.HttpCall, ImmutableMap.of()); + job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType); + + broker.pushBack(job); + final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker)); + + assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job)); + assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType))); + assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job))); + } + + private static String jobDataReflected(Job job) { + return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE) + .setExcludeFieldNames("created", "modified", "takenBy") + .toString(); + } + + @Test(enabled = false, expectedExceptions = IllegalStateException.class) + public void givenSingleJob_pushBackTwice_verifyPushingRejected() { + waitForFutureJob(newJobAsync(broker)); + final Job job = waitForFutureOptionalJob(pullJobAsync(broker)); + + broker.pushBack(job); + broker.pushBack(job); //Should fail + } + + @Test(enabled = false) + public void addJob_PeekItById_verifySameJobWasPeeked() { + String userId = UUID.randomUUID().toString(); + Job myJob = createMockJob(userId); + UUID uuid = broker.add(myJob); + Job peekedJob = broker.peek(uuid); + assertEquals("added testId is not the same as peeked TestsId", + userId, + peekedJob.getSharedData().getUserId()); + } + + @Test(enabled = false, dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class) + public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) { + final Job job = waitForFutureJob(newJobAsync(broker, status)); + broker.delete(job.getUuid()); + assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted"); + waitForFutureOptionalJob(pullJobAsync(broker)); + } + + @DataProvider + public static Object[][] jobStatusesForSuccessDelete() { + return new Object[][]{ + {PENDING}, + {STOPPED} + }; + } + + @Test(enabled = false, + dataProvider = "jobStatusesForFailedDelete", + expectedExceptions = OperationNotAllowedException.class, + expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE + ) + public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) { + final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status))); + + if (taken) { + waitForFutureOptionalJob(pullJobAsync(broker)); + } + + + broker.delete(job.getUuid()); + } + + @DataProvider + public static Object[][] jobStatusesForFailedDelete() { + return new Object[][]{ + {PENDING, true}, + {IN_PROGRESS, false}, + {COMPLETED, false}, + {PAUSE, false}, + {FAILED, false}, + }; + } + + @Test(enabled = false, expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE) + public void deleteJob_notExist_exceptionIsThrown() { + waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING))); + broker.delete(new UUID(111, 111)); + } + +} -- cgit 1.2.3-korg