diff options
author | 2017-03-31 15:03:13 -0400 | |
---|---|---|
committer | 2017-03-31 15:11:48 -0400 | |
commit | 974b67dd4021e6e839eaad25366bffe6d7a414c8 (patch) | |
tree | c16b969e86323ed8b0914253b1c74d42d31a7ed1 /dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java | |
parent | e0451f75b26082418757f279351c2d3e29c0a5c8 (diff) |
[SDNC-5] Rebase sdnc-core
Upgrade to OpenDaylight Boron release, and sync changes made since 16.10 release to ONAP SDN-C distribution
Change-Id: I20bef9e6d0008c4436b5624ce839bbb70ecc20a5
Signed-off-by: Dan Timoney <dtimoney@att.com>
Diffstat (limited to 'dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java')
-rw-r--r-- | dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java | 579 |
1 files changed, 579 insertions, 0 deletions
diff --git a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java new file mode 100644 index 0000000..c02bfa3 --- /dev/null +++ b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java @@ -0,0 +1,579 @@ +/*- + * ============LICENSE_START======================================================= + * openecomp + * ================================================================================ + * Copyright (C) 2016 - 2017 AT&T + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jdbc.pool; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * A simple implementation of a blocking queue with fairness waiting. + * invocations to method poll(...) will get handed out in the order they were received. + * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a + * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state. + * <br> + * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented. + * + * @param <E> Type of element in the queue + */ + +public class FairBlockingQueue<E> implements BlockingQueue<E> { + + /** + * This little sucker is used to reorder the way to do + * {@link java.util.concurrent.locks.Lock#lock()}, + * {@link java.util.concurrent.locks.Lock#unlock()} + * and + * {@link java.util.concurrent.CountDownLatch#countDown()} + * during the {@link #poll(long, TimeUnit)} operation. + * On Linux, it performs much better if we count down while we hold the global + * lock, on Solaris its the other way around. + * Until we have tested other platforms we only check for Linux. + */ + static final boolean isLinux = "Linux".equals(System.getProperty("os.name")) && + (!Boolean.getBoolean(FairBlockingQueue.class.getName()+".ignoreOS")); + + /** + * Phase one entry lock in order to give out + * per-thread-locks for the waiting phase we have + * a phase one lock during the contention period. + */ + final ReentrantLock lock = new ReentrantLock(false); + + /** + * All the objects in the pool are stored in a simple linked list + */ + final LinkedList<E> items; + + /** + * All threads waiting for an object are stored in a linked list + */ + final LinkedList<ExchangeCountDownLatch<E>> waiters; + + /** + * Creates a new fair blocking queue. + */ + public FairBlockingQueue() { + items = new LinkedList<>(); + waiters = new LinkedList<>(); + } + + //------------------------------------------------------------------ + // USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + /** + * Will always return true, queue is unbounded. + * {@inheritDoc} + */ + @Override + public boolean offer(E e) { + //during the offer, we will grab the main lock + final ReentrantLock lock = this.lock; + lock.lock(); + ExchangeCountDownLatch<E> c = null; + try { + //check to see if threads are waiting for an object + if (waiters.size() > 0) { + //if threads are waiting grab the latch for that thread + c = waiters.poll(); + //give the object to the thread instead of adding it to the pool + c.setItem(e); + if (isLinux) c.countDown(); + } else { + //we always add first, so that the most recently used object will be given out + items.addFirst(e); + } + } finally { + lock.unlock(); + } + //if we exchanged an object with another thread, wake it up. + if (!isLinux && c!=null) c.countDown(); + //we have an unbounded queue, so always return true + return true; + } + + /** + * Will never timeout, as it invokes the {@link #offer(Object)} method. + * Once a lock has been acquired, the + * {@inheritDoc} + */ + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + /** + * Fair retrieval of an object in the queue. + * Objects are returned in the order the threads requested them. + * {@inheritDoc} + */ + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E result = null; + final ReentrantLock lock = this.lock; + //acquire the global lock until we know what to do + lock.lock(); + try { + //check to see if we have objects + result = items.poll(); + if (result==null && timeout>0) { + //the queue is empty we will wait for an object + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1); + //add to the bottom of the wait list + waiters.addLast(c); + //unlock the global lock + lock.unlock(); + boolean didtimeout = true; + InterruptedException interruptedException = null; + try { + //wait for the specified timeout + didtimeout = !c.await(timeout, unit); + } catch (InterruptedException ix) { + interruptedException = ix; + } + if (didtimeout) { + //if we timed out, or got interrupted + // remove ourselves from the waitlist + lock.lock(); + try { + waiters.remove(c); + } finally { + lock.unlock(); + } + } + //return the item we received, can be null if we timed out + result = c.getItem(); + if (null!=interruptedException) { + //we got interrupted + if ( null!=result) { + //we got a result - clear the interrupt status + //don't propagate cause we have removed a connection from pool + Thread.interrupted(); + } else { + throw interruptedException; + } + } + } else { + //we have an object, release + lock.unlock(); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * Request an item from the queue asynchronously + * @return - a future pending the result from the queue poll request + */ + public Future<E> pollAsync() { + Future<E> result = null; + final ReentrantLock lock = this.lock; + //grab the global lock + lock.lock(); + try { + //check to see if we have objects in the queue + E item = items.poll(); + if (item==null) { + //queue is empty, add ourselves as waiters + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1); + waiters.addLast(c); + //return a future that will wait for the object + result = new ItemFuture<>(c); + } else { + //return a future with the item + result = new ItemFuture<>(item); + } + } finally { + lock.unlock(); + } + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean remove(Object e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.remove(e); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int size() { + return items.size(); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator<E> iterator() { + return new FairIterator(); + } + + /** + * {@inheritDoc} + */ + @Override + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.poll(); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean contains(Object e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.contains(e); + } finally { + lock.unlock(); + } + } + + + //------------------------------------------------------------------ + // NOT USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + /** + * {@inheritDoc} + */ + @Override + public boolean add(E e) { + return offer(e); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public int drainTo(Collection<? super E> c, int maxElements) { + throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + + @Override + public int drainTo(Collection<? super E> c) { + return drainTo(c,Integer.MAX_VALUE); + } + + /** + * {@inheritDoc} + */ + @Override + public void put(E e) throws InterruptedException { + offer(e); + } + + /** + * {@inheritDoc} + */ + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE - size(); + } + + /** + * {@inheritDoc} + */ + @Override + public E take() throws InterruptedException { + return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean addAll(Collection<? extends E> c) { + Iterator<? extends E> i = c.iterator(); + while (i.hasNext()) { + E e = i.next(); + offer(e); + } + return true; + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public void clear() { + throw new UnsupportedOperationException("void clear()"); + + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public boolean containsAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)"); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isEmpty() { + return size() == 0; + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public Object[] toArray() { + throw new UnsupportedOperationException("Object[] toArray()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException("<T> T[] toArray(T[] a)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public E element() { + throw new UnsupportedOperationException("E element()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public E peek() { + throw new UnsupportedOperationException("E peek()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperationException - this operation is not supported + */ + @Override + public E remove() { + throw new UnsupportedOperationException("E remove()"); + } + + + + //------------------------------------------------------------------ + // Non cancellable Future used to check and see if a connection has been made available + //------------------------------------------------------------------ + protected class ItemFuture<T> implements Future<T> { + protected volatile T item = null; + protected volatile ExchangeCountDownLatch<T> latch = null; + protected volatile boolean canceled = false; + + public ItemFuture(T item) { + this.item = item; + } + + public ItemFuture(ExchangeCountDownLatch<T> latch) { + this.latch = latch; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; //don't allow cancel for now + } + + @Override + public T get() throws InterruptedException, ExecutionException { + if (item!=null) { + return item; + } else if (latch!=null) { + latch.await(); + return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (item!=null) { + return item; + } else if (latch!=null) { + boolean timedout = !latch.await(timeout, unit); + if (timedout) throw new TimeoutException(); + else return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return (item!=null || latch.getItem()!=null); + } + + } + + //------------------------------------------------------------------ + // Count down latch that can be used to exchange information + //------------------------------------------------------------------ + protected class ExchangeCountDownLatch<T> extends CountDownLatch { + protected volatile T item; + public ExchangeCountDownLatch(int i) { + super(i); + } + public T getItem() { + return item; + } + public void setItem(T item) { + this.item = item; + } + } + + //------------------------------------------------------------------ + // Iterator safe from concurrent modification exceptions + //------------------------------------------------------------------ + protected class FairIterator implements Iterator<E> { + E[] elements = null; + int index; + E element = null; + + @SuppressWarnings("unchecked") // Can't create arrays of generic types + public FairIterator() { + final ReentrantLock lock = FairBlockingQueue.this.lock; + lock.lock(); + try { + elements = (E[]) new Object[FairBlockingQueue.this.items.size()]; + FairBlockingQueue.this.items.toArray(elements); + index = 0; + } finally { + lock.unlock(); + } + } + @Override + public boolean hasNext() { + return index<elements.length; + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + element = elements[index++]; + return element; + } + + @Override + public void remove() { + final ReentrantLock lock = FairBlockingQueue.this.lock; + lock.lock(); + try { + if (element!=null) { + FairBlockingQueue.this.items.remove(element); + } + } finally { + lock.unlock(); + } + } + + } +} |