diff options
Diffstat (limited to 'catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java')
-rw-r--r-- | catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java | 293 |
1 files changed, 146 insertions, 147 deletions
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java b/catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java index 74f0eac5ad..29b43c1920 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/distribution/TestQueue.java @@ -20,6 +20,11 @@ package org.openecomp.sdc.be.distribution; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler; +import org.openecomp.sdc.be.components.distribution.engine.INotificationData; +import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl; + import java.util.ArrayList; import java.util.List; import java.util.Timer; @@ -35,154 +40,148 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler; -import org.openecomp.sdc.be.components.distribution.engine.INotificationData; -import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - public class TestQueue { - public static void main(String[] args) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setNameFormat("distribution-notification-thread"); - ThreadFactory threadFactory = threadFactoryBuilder.build(); - // TODO: add the package of google to the pom - - ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); - // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L, - // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); - - // 2 threads are always up and they handle the tasks. in case core size - // is 0, only one is handles the tasks. - // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L, - // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); - - // TODO : check what happen when the number of threads are full. Throw - // RejectedExecutionException - // TODO : check what happen whether the pool is full and the size of - // pool - - ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory); - Runnable task = new Runnable() { - - @Override - public void run() { - try { - System.out.println("iN SLEEP" + Thread.currentThread()); - Thread.sleep(10 * 1000); - System.out.println("OUT SLEEP"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }; - - for (int i = 0; i < 4; i++) { - try { - executorService.submit(task); - } catch (RejectedExecutionException e) { - e.printStackTrace(); - } - } - - newCachedThreadPool.submit(task); - System.out.println("After submitting the task"); - - MyWorker[] watchThreads = new MyWorker[1]; - BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); - for (int i = 0; i < watchThreads.length; i++) { - MyWorker myWorker = new MyWorker(queue); - myWorker.start(); - } - - for (int i = 0; i < 1; i++) { - try { - queue.put("message " + i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - } - - public static class MyTimerTask extends TimerTask { - - AtomicBoolean state; - Thread thread; - - public MyTimerTask(AtomicBoolean state, Thread thread) { - super(); - this.state = state; - this.thread = thread; - - System.out.println("After create timer"); - } - - @Override - public void run() { - System.out.println("In running of Timer task"); - if (state.get() == false) { - System.out.println("In running of Timer task. Going to interrupt thread"); - // thread.interrupt(); - } else { - System.out.println("In running of Timer task. Finished."); - } - } - - } - - public static class MyWorker extends Thread { - - boolean active = true; - private final BlockingQueue<String> queue; - - public MyWorker(BlockingQueue<String> queue) { - this.queue = queue; - } - - Timer timer = new Timer(); - - public void run() { - try { - while (active) { - String s = queue.take(); - System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s); - - AtomicBoolean atomicBoolean = new AtomicBoolean(false); - MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this); - timer.schedule(myTimerTask, 10 * 1000); - doWork(s); - atomicBoolean.set(true); - - } - } catch (InterruptedException ie) { - - System.out.println("Interrupted our thread"); - ie.printStackTrace(); - } - } - - private void doWork(String s) { - // TODO Auto-generated method stub - - CambriaHandler cambriaHandler = new CambriaHandler(); - INotificationData data = new NotificationDataImpl(); - List<String> servers = new ArrayList<>(); - servers.add("aaaaaaa"); - cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data); - - System.out.println("IN WORK " + s); - try { - Thread.sleep(1 * 1000); - } catch (InterruptedException e) { - - for (int i = 0; i < 10; i++) { - System.out.println("*************************************************"); - } - e.printStackTrace(); - } - } - } + public static void main(String[] args) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setNameFormat("distribution-notification-thread"); + ThreadFactory threadFactory = threadFactoryBuilder.build(); + // TODO: add the package of google to the pom + + ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); + // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L, + // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); + + // 2 threads are always up and they handle the tasks. in case core size + // is 0, only one is handles the tasks. + // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L, + // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20)); + + // TODO : check what happen when the number of threads are full. Throw + // RejectedExecutionException + // TODO : check what happen whether the pool is full and the size of + // pool + + ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory); + Runnable task = new Runnable() { + + @Override + public void run() { + try { + System.out.println("iN SLEEP" + Thread.currentThread()); + Thread.sleep(10 * 1000); + System.out.println("OUT SLEEP"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + + for (int i = 0; i < 4; i++) { + try { + executorService.submit(task); + } catch (RejectedExecutionException e) { + e.printStackTrace(); + } + } + + newCachedThreadPool.submit(task); + System.out.println("After submitting the task"); + + MyWorker[] watchThreads = new MyWorker[1]; + BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); + for (int i = 0; i < watchThreads.length; i++) { + MyWorker myWorker = new MyWorker(queue); + myWorker.start(); + } + + for (int i = 0; i < 1; i++) { + try { + queue.put("message " + i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + public static class MyTimerTask extends TimerTask { + + AtomicBoolean state; + Thread thread; + + public MyTimerTask(AtomicBoolean state, Thread thread) { + super(); + this.state = state; + this.thread = thread; + + System.out.println("After create timer"); + } + + @Override + public void run() { + System.out.println("In running of Timer task"); + if (state.get() == false) { + System.out.println("In running of Timer task. Going to interrupt thread"); + // thread.interrupt(); + } else { + System.out.println("In running of Timer task. Finished."); + } + } + + } + + public static class MyWorker extends Thread { + + boolean active = true; + private final BlockingQueue<String> queue; + + public MyWorker(BlockingQueue<String> queue) { + this.queue = queue; + } + + Timer timer = new Timer(); + + public void run() { + try { + while (active) { + String s = queue.take(); + System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s); + + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this); + timer.schedule(myTimerTask, 10 * 1000); + doWork(s); + atomicBoolean.set(true); + + } + } catch (InterruptedException ie) { + + System.out.println("Interrupted our thread"); + ie.printStackTrace(); + } + } + + private void doWork(String s) { + // TODO Auto-generated method stub + + CambriaHandler cambriaHandler = new CambriaHandler(); + INotificationData data = new NotificationDataImpl(); + List<String> servers = new ArrayList<>(); + servers.add("aaaaaaa"); + cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data); + + System.out.println("IN WORK " + s); + try { + Thread.sleep(1 * 1000); + } catch (InterruptedException e) { + + for (int i = 0; i < 10; i++) { + System.out.println("*************************************************"); + } + e.printStackTrace(); + } + } + } } |