aboutsummaryrefslogtreecommitdiffstats
path: root/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java')
-rw-r--r--dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java584
1 files changed, 0 insertions, 584 deletions
diff --git a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java b/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
deleted file mode 100644
index a4b1f73..0000000
--- a/dblib/common/src/main/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
+++ /dev/null
@@ -1,584 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * openecomp
- * ================================================================================
- * Copyright (C) 2016 - 2017 AT&T
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.jdbc.pool;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b>
- *
- *
- * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention.
- * invocations to method poll(...) will get handed out in the order they were received.
- * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
- * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
- * <br>
- * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
- *
- * @param <E> Type of element in the queue
- */
-
-public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> {
-
- final int LOCK_COUNT = Runtime.getRuntime().availableProcessors();
-
- final AtomicInteger putQueue = new AtomicInteger(0);
- final AtomicInteger pollQueue = new AtomicInteger(0);
-
- public int getNextPut() {
- int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT;
- return idx;
- }
-
- public int getNextPoll() {
- int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT;
- return idx;
- }
- /**
- * Phase one entry lock in order to give out
- * per-thread-locks for the waiting phase we have
- * a phase one lock during the contention period.
- */
- private final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT];
-
- /**
- * All the objects in the pool are stored in a simple linked list
- */
- final LinkedList<E>[] items;
-
- /**
- * All threads waiting for an object are stored in a linked list
- */
- final LinkedList<ExchangeCountDownLatch<E>>[] waiters;
-
- /**
- * Creates a new fair blocking queue.
- */
- @SuppressWarnings("unchecked") // Can create arrays of generic types
- public MultiLockFairBlockingQueue() {
- items = new LinkedList[LOCK_COUNT];
- waiters = new LinkedList[LOCK_COUNT];
- for (int i=0; i<LOCK_COUNT; i++) {
- items[i] = new LinkedList<>();
- waiters[i] = new LinkedList<>();
- locks[i] = new ReentrantLock(false);
- }
- }
-
- //------------------------------------------------------------------
- // USED BY CONPOOL IMPLEMENTATION
- //------------------------------------------------------------------
- /**
- * Will always return true, queue is unbounded.
- * {@inheritDoc}
- */
- @Override
- public boolean offer(E e) {
- int idx = getNextPut();
- //during the offer, we will grab the main lock
- final ReentrantLock lock = this.locks[idx];
- lock.lock();
- ExchangeCountDownLatch<E> c = null;
- try {
- //check to see if threads are waiting for an object
- if (waiters[idx].size() > 0) {
- //if threads are waiting grab the latch for that thread
- c = waiters[idx].poll();
- //give the object to the thread instead of adding it to the pool
- c.setItem(e);
- } else {
- //we always add first, so that the most recently used object will be given out
- items[idx].addFirst(e);
- }
- } finally {
- lock.unlock();
- }
- //if we exchanged an object with another thread, wake it up.
- if (c!=null) c.countDown();
- //we have an unbounded queue, so always return true
- return true;
- }
-
- /**
- * Will never timeout, as it invokes the {@link #offer(Object)} method.
- * Once a lock has been acquired, the
- * {@inheritDoc}
- */
- @Override
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- return offer(e);
- }
-
- /**
- * Fair retrieval of an object in the queue.
- * Objects are returned in the order the threads requested them.
- * {@inheritDoc}
- */
- @Override
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- int idx = getNextPoll();
- E result = null;
- final ReentrantLock lock = this.locks[idx];
- try {
- //acquire the global lock until we know what to do
- lock.lock();
- //check to see if we have objects
- result = items[idx].poll();
- if (result==null && timeout>0) {
- //the queue is empty we will wait for an object
- ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
- //add to the bottom of the wait list
- waiters[idx].addLast(c);
- //unlock the global lock
- lock.unlock();
- //wait for the specified timeout
- if (!c.await(timeout, unit)) {
- //if we timed out, remove ourselves from the waitlist
- lock.lock();
- waiters[idx].remove(c);
- lock.unlock();
- }
- //return the item we received, can be null if we timed out
- result = c.getItem();
- } else {
- //we have an object, release
- lock.unlock();
- }
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- return result;
- }
-
- /**
- * Request an item from the queue asynchronously
- * @return - a future pending the result from the queue poll request
- */
- public Future<E> pollAsync() {
- int idx = getNextPoll();
- Future<E> result = null;
- final ReentrantLock lock = this.locks[idx];
- try {
- //grab the global lock
- lock.lock();
- //check to see if we have objects in the queue
- E item = items[idx].poll();
- if (item==null) {
- //queue is empty, add ourselves as waiters
- ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
- waiters[idx].addLast(c);
- //return a future that will wait for the object
- result = new ItemFuture<>(c);
- } else {
- //return a future with the item
- result = new ItemFuture<>(item);
- }
- } finally {
- lock.unlock();
- }
- return result;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean remove(Object e) {
- for (int idx=0; idx<LOCK_COUNT; idx++) {
- final ReentrantLock lock = this.locks[idx];
- lock.lock();
- try {
- boolean result = items[idx].remove(e);
- if (result) return result;
- } finally {
- lock.unlock();
- }
- }
- return false;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int size() {
- int size = 0;
- for (int idx=0; idx<LOCK_COUNT; idx++) {
- size += items[idx].size();
- }
- return size;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Iterator<E> iterator() {
- return new FairIterator();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public E poll() {
- int idx = getNextPoll();
- final ReentrantLock lock = this.locks[idx];
- lock.lock();
- try {
- return items[idx].poll();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean contains(Object e) {
- for (int idx=0; idx<LOCK_COUNT; idx++) {
- boolean result = items[idx].contains(e);
- if (result) return result;
- }
- return false;
- }
-
-
- //------------------------------------------------------------------
- // NOT USED BY CONPOOL IMPLEMENTATION
- //------------------------------------------------------------------
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean add(E e) {
- return offer(e);
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public int drainTo(Collection<? super E> c, int maxElements) {
- throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public int drainTo(Collection<? super E> c) {
- return drainTo(c,Integer.MAX_VALUE);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void put(E e) throws InterruptedException {
- offer(e);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int remainingCapacity() {
- return Integer.MAX_VALUE - size();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public E take() throws InterruptedException {
- return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean addAll(Collection<? extends E> c) {
- Iterator<? extends E> i = c.iterator();
- while (i.hasNext()) {
- E e = i.next();
- offer(e);
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public void clear() {
- throw new UnsupportedOperationException("void clear()");
-
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public boolean containsAll(Collection<?> c) {
- throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isEmpty() {
- return size() == 0;
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public boolean removeAll(Collection<?> c) {
- throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public boolean retainAll(Collection<?> c) {
- throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public Object[] toArray() {
- throw new UnsupportedOperationException("Object[] toArray()");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public <T> T[] toArray(T[] a) {
- throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public E element() {
- throw new UnsupportedOperationException("E element()");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public E peek() {
- throw new UnsupportedOperationException("E peek()");
- }
-
- /**
- * {@inheritDoc}
- * @throws UnsupportedOperationException - this operation is not supported
- */
- @Override
- public E remove() {
- throw new UnsupportedOperationException("E remove()");
- }
-
-
-
- //------------------------------------------------------------------
- // Non cancellable Future used to check and see if a connection has been made available
- //------------------------------------------------------------------
- protected class ItemFuture<T> implements Future<T> {
- protected volatile T item = null;
- protected volatile ExchangeCountDownLatch<T> latch = null;
- protected volatile boolean canceled = false;
-
- public ItemFuture(T item) {
- this.item = item;
- }
-
- public ItemFuture(ExchangeCountDownLatch<T> latch) {
- this.latch = latch;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false; //don't allow cancel for now
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- if (item!=null) {
- return item;
- } else if (latch!=null) {
- latch.await();
- return latch.getItem();
- } else {
- throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
- }
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (item!=null) {
- return item;
- } else if (latch!=null) {
- boolean timedout = !latch.await(timeout, unit);
- if (timedout) throw new TimeoutException();
- else return latch.getItem();
- } else {
- throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
- }
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return (item!=null || latch.getItem()!=null);
- }
-
- }
-
- //------------------------------------------------------------------
- // Count down latch that can be used to exchange information
- //------------------------------------------------------------------
- protected class ExchangeCountDownLatch<T> extends CountDownLatch {
- protected volatile T item;
- public ExchangeCountDownLatch(int i) {
- super(i);
- }
- public T getItem() {
- return item;
- }
- public void setItem(T item) {
- this.item = item;
- }
- }
-
- //------------------------------------------------------------------
- // Iterator safe from concurrent modification exceptions
- //------------------------------------------------------------------
- protected class FairIterator implements Iterator<E> {
- E[] elements = null;
- int index;
- E element = null;
-
- @SuppressWarnings("unchecked") // Can't create arrays of generic types
- public FairIterator() {
- ArrayList<E> list = new ArrayList<>(MultiLockFairBlockingQueue.this.size());
- for (int idx=0; idx<LOCK_COUNT; idx++) {
- final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
- lock.lock();
- try {
- elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()];
- MultiLockFairBlockingQueue.this.items[idx].toArray(elements);
-
- } finally {
- lock.unlock();
- }
- }
- index = 0;
- elements = (E[]) new Object[list.size()];
- list.toArray(elements);
- }
- @Override
- public boolean hasNext() {
- return index<elements.length;
- }
-
- @Override
- public E next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- element = elements[index++];
- return element;
- }
-
- @Override
- public void remove() {
- for (int idx=0; idx<LOCK_COUNT; idx++) {
- final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
- lock.lock();
- try {
- boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]);
- if (result) break;
- } finally {
- lock.unlock();
- }
- }
-
- }
-
- }
-}