diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala | 326 |
1 files changed, 0 insertions, 326 deletions
diff --git a/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala b/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala deleted file mode 100644 index f8deda746c..0000000000 --- a/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala +++ /dev/null @@ -1,326 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.util - -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.{ TimeUnit, BlockingQueue } -import java.util.{ AbstractQueue, Queue, Collection, Iterator } - -class BoundedBlockingQueue[E <: AnyRef]( - val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { - - backing match { - case null => throw new IllegalArgumentException("Backing Queue may not be null") - case b: BlockingQueue[_] => - require(maxCapacity > 0) - require(b.size() == 0) - require(b.remainingCapacity >= maxCapacity) - case b: Queue[_] => - require(b.size() == 0) - require(maxCapacity > 0) - } - - protected val lock = new ReentrantLock(false) - - private val notEmpty = lock.newCondition() - private val notFull = lock.newCondition() - - def put(e: E): Unit = { //Blocks until not full - if (e eq null) throw new NullPointerException - lock.lock() - try { - while (backing.size() == maxCapacity) - notFull.await() - require(backing.offer(e)) - notEmpty.signal() - } finally { - lock.unlock() - } - } - - def take(): E = { //Blocks until not empty - lock.lockInterruptibly() - try { - while (backing.size() == 0) - notEmpty.await() - val e = backing.poll() - require(e ne null) - notFull.signal() - e - } finally { - lock.unlock() - } - } - - def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false - if (e eq null) throw new NullPointerException - lock.lock() - try { - if (backing.size() == maxCapacity) false - else { - require(backing.offer(e)) //Should never fail - notEmpty.signal() - true - } - } finally { - lock.unlock() - } - } - - def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail - if (e eq null) throw new NullPointerException - var nanos = unit.toNanos(timeout) - lock.lockInterruptibly() - try { - while (backing.size() == maxCapacity) { - if (nanos <= 0) - return false - else - nanos = notFull.awaitNanos(nanos) - } - require(backing.offer(e)) //Should never fail - notEmpty.signal() - true - } finally { - lock.unlock() - } - } - - def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail - var nanos = unit.toNanos(timeout) - lock.lockInterruptibly() - try { - var result: E = null.asInstanceOf[E] - var hasResult = false - while (!hasResult) { - hasResult = backing.poll() match { - case null if nanos <= 0 => - result = null.asInstanceOf[E] - true - case null => - try { - nanos = notEmpty.awaitNanos(nanos) - } catch { - case ie: InterruptedException => - notEmpty.signal() - throw ie - } - false - case e => - notFull.signal() - result = e - true - } - } - result - } finally { - lock.unlock() - } - } - - def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null - lock.lock() - try { - backing.poll() match { - case null => null.asInstanceOf[E] - case e => - notFull.signal() - e - } - } finally { - lock.unlock - } - } - - override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false - if (e eq null) throw new NullPointerException - lock.lock() - try { - if (backing remove e) { - notFull.signal() - true - } else false - } finally { - lock.unlock() - } - } - - override def contains(e: AnyRef): Boolean = { - if (e eq null) throw new NullPointerException - lock.lock() - try { - backing contains e - } finally { - lock.unlock() - } - } - - override def clear(): Unit = { - lock.lock() - try { - backing.clear - } finally { - lock.unlock() - } - } - - def remainingCapacity(): Int = { - lock.lock() - try { - maxCapacity - backing.size() - } finally { - lock.unlock() - } - } - - def size(): Int = { - lock.lock() - try { - backing.size() - } finally { - lock.unlock() - } - } - - def peek(): E = { - lock.lock() - try { - backing.peek() - } finally { - lock.unlock() - } - } - - def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) - - def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { - if (c eq null) throw new NullPointerException - if (c eq this) throw new IllegalArgumentException - if (maxElements <= 0) 0 - else { - lock.lock() - try { - var n = 0 - var e: E = null.asInstanceOf[E] - while (n < maxElements) { - backing.poll() match { - case null => return n - case e => - c add e - n += 1 - } - } - n - } finally { - lock.unlock() - } - } - } - - override def containsAll(c: Collection[_]): Boolean = { - lock.lock() - try { - backing containsAll c - } finally { - lock.unlock() - } - } - - override def removeAll(c: Collection[_]): Boolean = { - lock.lock() - try { - if (backing.removeAll(c)) { - val sz = backing.size() - if (sz < maxCapacity) notFull.signal() - if (sz > 0) notEmpty.signal() //FIXME needed? - true - } else false - } finally { - lock.unlock() - } - } - - override def retainAll(c: Collection[_]): Boolean = { - lock.lock() - try { - if (backing.retainAll(c)) { - val sz = backing.size() - if (sz < maxCapacity) notFull.signal() //FIXME needed? - if (sz > 0) notEmpty.signal() - true - } else false - } finally { - lock.unlock() - } - } - - def iterator(): Iterator[E] = { - lock.lock - try { - val elements = backing.toArray - new Iterator[E] { - var at = 0 - var last = -1 - - def hasNext(): Boolean = at < elements.length - - def next(): E = { - if (at >= elements.length) throw new NoSuchElementException - last = at - at += 1 - elements(last).asInstanceOf[E] - } - - def remove(): Unit = { - if (last < 0) throw new IllegalStateException - val target = elements(last) - last = -1 //To avoid 2 subsequent removes without a next in between - lock.lock() - try { - val i = backing.iterator() - while (i.hasNext) { - if (i.next eq target) { - i.remove() - notFull.signal() - return () - } - } - } finally { - lock.unlock() - } - } - } - } finally { - lock.unlock - } - } - - override def toArray(): Array[AnyRef] = { - lock.lock() - try { - backing.toArray - } finally { - lock.unlock() - } - } - - override def isEmpty(): Boolean = { - lock.lock() - try { - backing.isEmpty() - } finally { - lock.unlock() - } - } - - override def toArray[X](a: Array[X with AnyRef]) = { - lock.lock() - try { - backing.toArray[X](a) - } finally { - lock.unlock() - } - } -} |