aboutsummaryrefslogtreecommitdiffstats
path: root/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java')
-rw-r--r--dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java584
1 files changed, 584 insertions, 0 deletions
diff --git a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
new file mode 100644
index 0000000..a4b1f73
--- /dev/null
+++ b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
@@ -0,0 +1,584 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openecomp
+ * ================================================================================
+ * Copyright (C) 2016 - 2017 AT&T
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.jdbc.pool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b>
+ *
+ *
+ * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention.
+ * invocations to method poll(...) will get handed out in the order they were received.
+ * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
+ * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
+ * <br>
+ * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
+ *
+ * @param <E> Type of element in the queue
+ */
+
+public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> {
+
+ final int LOCK_COUNT = Runtime.getRuntime().availableProcessors();
+
+ final AtomicInteger putQueue = new AtomicInteger(0);
+ final AtomicInteger pollQueue = new AtomicInteger(0);
+
+ public int getNextPut() {
+ int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT;
+ return idx;
+ }
+
+ public int getNextPoll() {
+ int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT;
+ return idx;
+ }
+ /**
+ * Phase one entry lock in order to give out
+ * per-thread-locks for the waiting phase we have
+ * a phase one lock during the contention period.
+ */
+ private final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT];
+
+ /**
+ * All the objects in the pool are stored in a simple linked list
+ */
+ final LinkedList<E>[] items;
+
+ /**
+ * All threads waiting for an object are stored in a linked list
+ */
+ final LinkedList<ExchangeCountDownLatch<E>>[] waiters;
+
+ /**
+ * Creates a new fair blocking queue.
+ */
+ @SuppressWarnings("unchecked") // Can create arrays of generic types
+ public MultiLockFairBlockingQueue() {
+ items = new LinkedList[LOCK_COUNT];
+ waiters = new LinkedList[LOCK_COUNT];
+ for (int i=0; i<LOCK_COUNT; i++) {
+ items[i] = new LinkedList<>();
+ waiters[i] = new LinkedList<>();
+ locks[i] = new ReentrantLock(false);
+ }
+ }
+
+ //------------------------------------------------------------------
+ // USED BY CONPOOL IMPLEMENTATION
+ //------------------------------------------------------------------
+ /**
+ * Will always return true, queue is unbounded.
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean offer(E e) {
+ int idx = getNextPut();
+ //during the offer, we will grab the main lock
+ final ReentrantLock lock = this.locks[idx];
+ lock.lock();
+ ExchangeCountDownLatch<E> c = null;
+ try {
+ //check to see if threads are waiting for an object
+ if (waiters[idx].size() > 0) {
+ //if threads are waiting grab the latch for that thread
+ c = waiters[idx].poll();
+ //give the object to the thread instead of adding it to the pool
+ c.setItem(e);
+ } else {
+ //we always add first, so that the most recently used object will be given out
+ items[idx].addFirst(e);
+ }
+ } finally {
+ lock.unlock();
+ }
+ //if we exchanged an object with another thread, wake it up.
+ if (c!=null) c.countDown();
+ //we have an unbounded queue, so always return true
+ return true;
+ }
+
+ /**
+ * Will never timeout, as it invokes the {@link #offer(Object)} method.
+ * Once a lock has been acquired, the
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+ return offer(e);
+ }
+
+ /**
+ * Fair retrieval of an object in the queue.
+ * Objects are returned in the order the threads requested them.
+ * {@inheritDoc}
+ */
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ int idx = getNextPoll();
+ E result = null;
+ final ReentrantLock lock = this.locks[idx];
+ try {
+ //acquire the global lock until we know what to do
+ lock.lock();
+ //check to see if we have objects
+ result = items[idx].poll();
+ if (result==null && timeout>0) {
+ //the queue is empty we will wait for an object
+ ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
+ //add to the bottom of the wait list
+ waiters[idx].addLast(c);
+ //unlock the global lock
+ lock.unlock();
+ //wait for the specified timeout
+ if (!c.await(timeout, unit)) {
+ //if we timed out, remove ourselves from the waitlist
+ lock.lock();
+ waiters[idx].remove(c);
+ lock.unlock();
+ }
+ //return the item we received, can be null if we timed out
+ result = c.getItem();
+ } else {
+ //we have an object, release
+ lock.unlock();
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Request an item from the queue asynchronously
+ * @return - a future pending the result from the queue poll request
+ */
+ public Future<E> pollAsync() {
+ int idx = getNextPoll();
+ Future<E> result = null;
+ final ReentrantLock lock = this.locks[idx];
+ try {
+ //grab the global lock
+ lock.lock();
+ //check to see if we have objects in the queue
+ E item = items[idx].poll();
+ if (item==null) {
+ //queue is empty, add ourselves as waiters
+ ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
+ waiters[idx].addLast(c);
+ //return a future that will wait for the object
+ result = new ItemFuture<>(c);
+ } else {
+ //return a future with the item
+ result = new ItemFuture<>(item);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean remove(Object e) {
+ for (int idx=0; idx<LOCK_COUNT; idx++) {
+ final ReentrantLock lock = this.locks[idx];
+ lock.lock();
+ try {
+ boolean result = items[idx].remove(e);
+ if (result) return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int size() {
+ int size = 0;
+ for (int idx=0; idx<LOCK_COUNT; idx++) {
+ size += items[idx].size();
+ }
+ return size;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Iterator<E> iterator() {
+ return new FairIterator();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public E poll() {
+ int idx = getNextPoll();
+ final ReentrantLock lock = this.locks[idx];
+ lock.lock();
+ try {
+ return items[idx].poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean contains(Object e) {
+ for (int idx=0; idx<LOCK_COUNT; idx++) {
+ boolean result = items[idx].contains(e);
+ if (result) return result;
+ }
+ return false;
+ }
+
+
+ //------------------------------------------------------------------
+ // NOT USED BY CONPOOL IMPLEMENTATION
+ //------------------------------------------------------------------
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean add(E e) {
+ return offer(e);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ return drainTo(c,Integer.MAX_VALUE);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void put(E e) throws InterruptedException {
+ offer(e);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE - size();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public E take() throws InterruptedException {
+ return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ Iterator<? extends E> i = c.iterator();
+ while (i.hasNext()) {
+ E e = i.next();
+ offer(e);
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException("void clear()");
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException("Object[] toArray()");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public E element() {
+ throw new UnsupportedOperationException("E element()");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public E peek() {
+ throw new UnsupportedOperationException("E peek()");
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws UnsupportedOperationException - this operation is not supported
+ */
+ @Override
+ public E remove() {
+ throw new UnsupportedOperationException("E remove()");
+ }
+
+
+
+ //------------------------------------------------------------------
+ // Non cancellable Future used to check and see if a connection has been made available
+ //------------------------------------------------------------------
+ protected class ItemFuture<T> implements Future<T> {
+ protected volatile T item = null;
+ protected volatile ExchangeCountDownLatch<T> latch = null;
+ protected volatile boolean canceled = false;
+
+ public ItemFuture(T item) {
+ this.item = item;
+ }
+
+ public ItemFuture(ExchangeCountDownLatch<T> latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false; //don't allow cancel for now
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ if (item!=null) {
+ return item;
+ } else if (latch!=null) {
+ latch.await();
+ return latch.getItem();
+ } else {
+ throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (item!=null) {
+ return item;
+ } else if (latch!=null) {
+ boolean timedout = !latch.await(timeout, unit);
+ if (timedout) throw new TimeoutException();
+ else return latch.getItem();
+ } else {
+ throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return (item!=null || latch.getItem()!=null);
+ }
+
+ }
+
+ //------------------------------------------------------------------
+ // Count down latch that can be used to exchange information
+ //------------------------------------------------------------------
+ protected class ExchangeCountDownLatch<T> extends CountDownLatch {
+ protected volatile T item;
+ public ExchangeCountDownLatch(int i) {
+ super(i);
+ }
+ public T getItem() {
+ return item;
+ }
+ public void setItem(T item) {
+ this.item = item;
+ }
+ }
+
+ //------------------------------------------------------------------
+ // Iterator safe from concurrent modification exceptions
+ //------------------------------------------------------------------
+ protected class FairIterator implements Iterator<E> {
+ E[] elements = null;
+ int index;
+ E element = null;
+
+ @SuppressWarnings("unchecked") // Can't create arrays of generic types
+ public FairIterator() {
+ ArrayList<E> list = new ArrayList<>(MultiLockFairBlockingQueue.this.size());
+ for (int idx=0; idx<LOCK_COUNT; idx++) {
+ final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
+ lock.lock();
+ try {
+ elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()];
+ MultiLockFairBlockingQueue.this.items[idx].toArray(elements);
+
+ } finally {
+ lock.unlock();
+ }
+ }
+ index = 0;
+ elements = (E[]) new Object[list.size()];
+ list.toArray(elements);
+ }
+ @Override
+ public boolean hasNext() {
+ return index<elements.length;
+ }
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ element = elements[index++];
+ return element;
+ }
+
+ @Override
+ public void remove() {
+ for (int idx=0; idx<LOCK_COUNT; idx++) {
+ final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
+ lock.lock();
+ try {
+ boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]);
+ if (result) break;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ }
+
+ }
+}