diff options
Diffstat (limited to 'dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java')
-rw-r--r-- | dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java | 584 |
1 files changed, 0 insertions, 584 deletions
diff --git a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java deleted file mode 100644 index a4b1f73..0000000 --- a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java +++ /dev/null @@ -1,584 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openecomp - * ================================================================================ - * Copyright (C) 2016 - 2017 AT&T - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tomcat.jdbc.pool; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.NoSuchElementException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - -/** - * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b> - * - * - * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention. - * invocations to method poll(...) will get handed out in the order they were received. - * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a - * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state. - * <br> - * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented. - * - * @param <E> Type of element in the queue - */ - -public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> { - - final int LOCK_COUNT = Runtime.getRuntime().availableProcessors(); - - final AtomicInteger putQueue = new AtomicInteger(0); - final AtomicInteger pollQueue = new AtomicInteger(0); - - public int getNextPut() { - int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT; - return idx; - } - - public int getNextPoll() { - int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT; - return idx; - } - /** - * Phase one entry lock in order to give out - * per-thread-locks for the waiting phase we have - * a phase one lock during the contention period. - */ - private final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT]; - - /** - * All the objects in the pool are stored in a simple linked list - */ - final LinkedList<E>[] items; - - /** - * All threads waiting for an object are stored in a linked list - */ - final LinkedList<ExchangeCountDownLatch<E>>[] waiters; - - /** - * Creates a new fair blocking queue. - */ - @SuppressWarnings("unchecked") // Can create arrays of generic types - public MultiLockFairBlockingQueue() { - items = new LinkedList[LOCK_COUNT]; - waiters = new LinkedList[LOCK_COUNT]; - for (int i=0; i<LOCK_COUNT; i++) { - items[i] = new LinkedList<>(); - waiters[i] = new LinkedList<>(); - locks[i] = new ReentrantLock(false); - } - } - - //------------------------------------------------------------------ - // USED BY CONPOOL IMPLEMENTATION - //------------------------------------------------------------------ - /** - * Will always return true, queue is unbounded. - * {@inheritDoc} - */ - @Override - public boolean offer(E e) { - int idx = getNextPut(); - //during the offer, we will grab the main lock - final ReentrantLock lock = this.locks[idx]; - lock.lock(); - ExchangeCountDownLatch<E> c = null; - try { - //check to see if threads are waiting for an object - if (waiters[idx].size() > 0) { - //if threads are waiting grab the latch for that thread - c = waiters[idx].poll(); - //give the object to the thread instead of adding it to the pool - c.setItem(e); - } else { - //we always add first, so that the most recently used object will be given out - items[idx].addFirst(e); - } - } finally { - lock.unlock(); - } - //if we exchanged an object with another thread, wake it up. - if (c!=null) c.countDown(); - //we have an unbounded queue, so always return true - return true; - } - - /** - * Will never timeout, as it invokes the {@link #offer(Object)} method. - * Once a lock has been acquired, the - * {@inheritDoc} - */ - @Override - public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - return offer(e); - } - - /** - * Fair retrieval of an object in the queue. - * Objects are returned in the order the threads requested them. - * {@inheritDoc} - */ - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - int idx = getNextPoll(); - E result = null; - final ReentrantLock lock = this.locks[idx]; - try { - //acquire the global lock until we know what to do - lock.lock(); - //check to see if we have objects - result = items[idx].poll(); - if (result==null && timeout>0) { - //the queue is empty we will wait for an object - ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1); - //add to the bottom of the wait list - waiters[idx].addLast(c); - //unlock the global lock - lock.unlock(); - //wait for the specified timeout - if (!c.await(timeout, unit)) { - //if we timed out, remove ourselves from the waitlist - lock.lock(); - waiters[idx].remove(c); - lock.unlock(); - } - //return the item we received, can be null if we timed out - result = c.getItem(); - } else { - //we have an object, release - lock.unlock(); - } - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - return result; - } - - /** - * Request an item from the queue asynchronously - * @return - a future pending the result from the queue poll request - */ - public Future<E> pollAsync() { - int idx = getNextPoll(); - Future<E> result = null; - final ReentrantLock lock = this.locks[idx]; - try { - //grab the global lock - lock.lock(); - //check to see if we have objects in the queue - E item = items[idx].poll(); - if (item==null) { - //queue is empty, add ourselves as waiters - ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1); - waiters[idx].addLast(c); - //return a future that will wait for the object - result = new ItemFuture<>(c); - } else { - //return a future with the item - result = new ItemFuture<>(item); - } - } finally { - lock.unlock(); - } - return result; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean remove(Object e) { - for (int idx=0; idx<LOCK_COUNT; idx++) { - final ReentrantLock lock = this.locks[idx]; - lock.lock(); - try { - boolean result = items[idx].remove(e); - if (result) return result; - } finally { - lock.unlock(); - } - } - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public int size() { - int size = 0; - for (int idx=0; idx<LOCK_COUNT; idx++) { - size += items[idx].size(); - } - return size; - } - - /** - * {@inheritDoc} - */ - @Override - public Iterator<E> iterator() { - return new FairIterator(); - } - - /** - * {@inheritDoc} - */ - @Override - public E poll() { - int idx = getNextPoll(); - final ReentrantLock lock = this.locks[idx]; - lock.lock(); - try { - return items[idx].poll(); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean contains(Object e) { - for (int idx=0; idx<LOCK_COUNT; idx++) { - boolean result = items[idx].contains(e); - if (result) return result; - } - return false; - } - - - //------------------------------------------------------------------ - // NOT USED BY CONPOOL IMPLEMENTATION - //------------------------------------------------------------------ - /** - * {@inheritDoc} - */ - @Override - public boolean add(E e) { - return offer(e); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public int drainTo(Collection<? super E> c, int maxElements) { - throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public int drainTo(Collection<? super E> c) { - return drainTo(c,Integer.MAX_VALUE); - } - - /** - * {@inheritDoc} - */ - @Override - public void put(E e) throws InterruptedException { - offer(e); - } - - /** - * {@inheritDoc} - */ - @Override - public int remainingCapacity() { - return Integer.MAX_VALUE - size(); - } - - /** - * {@inheritDoc} - */ - @Override - public E take() throws InterruptedException { - return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean addAll(Collection<? extends E> c) { - Iterator<? extends E> i = c.iterator(); - while (i.hasNext()) { - E e = i.next(); - offer(e); - } - return true; - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public void clear() { - throw new UnsupportedOperationException("void clear()"); - - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public boolean containsAll(Collection<?> c) { - throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)"); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isEmpty() { - return size() == 0; - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public boolean removeAll(Collection<?> c) { - throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public boolean retainAll(Collection<?> c) { - throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public Object[] toArray() { - throw new UnsupportedOperationException("Object[] toArray()"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public <T> T[] toArray(T[] a) { - throw new UnsupportedOperationException("<T> T[] toArray(T[] a)"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public E element() { - throw new UnsupportedOperationException("E element()"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public E peek() { - throw new UnsupportedOperationException("E peek()"); - } - - /** - * {@inheritDoc} - * @throws UnsupportedOperationException - this operation is not supported - */ - @Override - public E remove() { - throw new UnsupportedOperationException("E remove()"); - } - - - - //------------------------------------------------------------------ - // Non cancellable Future used to check and see if a connection has been made available - //------------------------------------------------------------------ - protected class ItemFuture<T> implements Future<T> { - protected volatile T item = null; - protected volatile ExchangeCountDownLatch<T> latch = null; - protected volatile boolean canceled = false; - - public ItemFuture(T item) { - this.item = item; - } - - public ItemFuture(ExchangeCountDownLatch<T> latch) { - this.latch = latch; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; //don't allow cancel for now - } - - @Override - public T get() throws InterruptedException, ExecutionException { - if (item!=null) { - return item; - } else if (latch!=null) { - latch.await(); - return latch.getItem(); - } else { - throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); - } - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (item!=null) { - return item; - } else if (latch!=null) { - boolean timedout = !latch.await(timeout, unit); - if (timedout) throw new TimeoutException(); - else return latch.getItem(); - } else { - throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); - } - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return (item!=null || latch.getItem()!=null); - } - - } - - //------------------------------------------------------------------ - // Count down latch that can be used to exchange information - //------------------------------------------------------------------ - protected class ExchangeCountDownLatch<T> extends CountDownLatch { - protected volatile T item; - public ExchangeCountDownLatch(int i) { - super(i); - } - public T getItem() { - return item; - } - public void setItem(T item) { - this.item = item; - } - } - - //------------------------------------------------------------------ - // Iterator safe from concurrent modification exceptions - //------------------------------------------------------------------ - protected class FairIterator implements Iterator<E> { - E[] elements = null; - int index; - E element = null; - - @SuppressWarnings("unchecked") // Can't create arrays of generic types - public FairIterator() { - ArrayList<E> list = new ArrayList<>(MultiLockFairBlockingQueue.this.size()); - for (int idx=0; idx<LOCK_COUNT; idx++) { - final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx]; - lock.lock(); - try { - elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()]; - MultiLockFairBlockingQueue.this.items[idx].toArray(elements); - - } finally { - lock.unlock(); - } - } - index = 0; - elements = (E[]) new Object[list.size()]; - list.toArray(elements); - } - @Override - public boolean hasNext() { - return index<elements.length; - } - - @Override - public E next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - element = elements[index++]; - return element; - } - - @Override - public void remove() { - for (int idx=0; idx<LOCK_COUNT; idx++) { - final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx]; - lock.lock(); - try { - boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]); - if (result) break; - } finally { - lock.unlock(); - } - } - - } - - } -} |