summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala')
-rw-r--r--test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala326
1 files changed, 0 insertions, 326 deletions
diff --git a/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala b/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
deleted file mode 100644
index f8deda746c..0000000000
--- a/test/disabled/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
+++ /dev/null
@@ -1,326 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.util
-
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{ TimeUnit, BlockingQueue }
-import java.util.{ AbstractQueue, Queue, Collection, Iterator }
-
-class BoundedBlockingQueue[E <: AnyRef](
- val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
-
- backing match {
- case null => throw new IllegalArgumentException("Backing Queue may not be null")
- case b: BlockingQueue[_] =>
- require(maxCapacity > 0)
- require(b.size() == 0)
- require(b.remainingCapacity >= maxCapacity)
- case b: Queue[_] =>
- require(b.size() == 0)
- require(maxCapacity > 0)
- }
-
- protected val lock = new ReentrantLock(false)
-
- private val notEmpty = lock.newCondition()
- private val notFull = lock.newCondition()
-
- def put(e: E): Unit = { //Blocks until not full
- if (e eq null) throw new NullPointerException
- lock.lock()
- try {
- while (backing.size() == maxCapacity)
- notFull.await()
- require(backing.offer(e))
- notEmpty.signal()
- } finally {
- lock.unlock()
- }
- }
-
- def take(): E = { //Blocks until not empty
- lock.lockInterruptibly()
- try {
- while (backing.size() == 0)
- notEmpty.await()
- val e = backing.poll()
- require(e ne null)
- notFull.signal()
- e
- } finally {
- lock.unlock()
- }
- }
-
- def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
- if (e eq null) throw new NullPointerException
- lock.lock()
- try {
- if (backing.size() == maxCapacity) false
- else {
- require(backing.offer(e)) //Should never fail
- notEmpty.signal()
- true
- }
- } finally {
- lock.unlock()
- }
- }
-
- def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
- if (e eq null) throw new NullPointerException
- var nanos = unit.toNanos(timeout)
- lock.lockInterruptibly()
- try {
- while (backing.size() == maxCapacity) {
- if (nanos <= 0)
- return false
- else
- nanos = notFull.awaitNanos(nanos)
- }
- require(backing.offer(e)) //Should never fail
- notEmpty.signal()
- true
- } finally {
- lock.unlock()
- }
- }
-
- def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
- var nanos = unit.toNanos(timeout)
- lock.lockInterruptibly()
- try {
- var result: E = null.asInstanceOf[E]
- var hasResult = false
- while (!hasResult) {
- hasResult = backing.poll() match {
- case null if nanos <= 0 =>
- result = null.asInstanceOf[E]
- true
- case null =>
- try {
- nanos = notEmpty.awaitNanos(nanos)
- } catch {
- case ie: InterruptedException =>
- notEmpty.signal()
- throw ie
- }
- false
- case e =>
- notFull.signal()
- result = e
- true
- }
- }
- result
- } finally {
- lock.unlock()
- }
- }
-
- def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
- lock.lock()
- try {
- backing.poll() match {
- case null => null.asInstanceOf[E]
- case e =>
- notFull.signal()
- e
- }
- } finally {
- lock.unlock
- }
- }
-
- override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
- if (e eq null) throw new NullPointerException
- lock.lock()
- try {
- if (backing remove e) {
- notFull.signal()
- true
- } else false
- } finally {
- lock.unlock()
- }
- }
-
- override def contains(e: AnyRef): Boolean = {
- if (e eq null) throw new NullPointerException
- lock.lock()
- try {
- backing contains e
- } finally {
- lock.unlock()
- }
- }
-
- override def clear(): Unit = {
- lock.lock()
- try {
- backing.clear
- } finally {
- lock.unlock()
- }
- }
-
- def remainingCapacity(): Int = {
- lock.lock()
- try {
- maxCapacity - backing.size()
- } finally {
- lock.unlock()
- }
- }
-
- def size(): Int = {
- lock.lock()
- try {
- backing.size()
- } finally {
- lock.unlock()
- }
- }
-
- def peek(): E = {
- lock.lock()
- try {
- backing.peek()
- } finally {
- lock.unlock()
- }
- }
-
- def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
-
- def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
- if (c eq null) throw new NullPointerException
- if (c eq this) throw new IllegalArgumentException
- if (maxElements <= 0) 0
- else {
- lock.lock()
- try {
- var n = 0
- var e: E = null.asInstanceOf[E]
- while (n < maxElements) {
- backing.poll() match {
- case null => return n
- case e =>
- c add e
- n += 1
- }
- }
- n
- } finally {
- lock.unlock()
- }
- }
- }
-
- override def containsAll(c: Collection[_]): Boolean = {
- lock.lock()
- try {
- backing containsAll c
- } finally {
- lock.unlock()
- }
- }
-
- override def removeAll(c: Collection[_]): Boolean = {
- lock.lock()
- try {
- if (backing.removeAll(c)) {
- val sz = backing.size()
- if (sz < maxCapacity) notFull.signal()
- if (sz > 0) notEmpty.signal() //FIXME needed?
- true
- } else false
- } finally {
- lock.unlock()
- }
- }
-
- override def retainAll(c: Collection[_]): Boolean = {
- lock.lock()
- try {
- if (backing.retainAll(c)) {
- val sz = backing.size()
- if (sz < maxCapacity) notFull.signal() //FIXME needed?
- if (sz > 0) notEmpty.signal()
- true
- } else false
- } finally {
- lock.unlock()
- }
- }
-
- def iterator(): Iterator[E] = {
- lock.lock
- try {
- val elements = backing.toArray
- new Iterator[E] {
- var at = 0
- var last = -1
-
- def hasNext(): Boolean = at < elements.length
-
- def next(): E = {
- if (at >= elements.length) throw new NoSuchElementException
- last = at
- at += 1
- elements(last).asInstanceOf[E]
- }
-
- def remove(): Unit = {
- if (last < 0) throw new IllegalStateException
- val target = elements(last)
- last = -1 //To avoid 2 subsequent removes without a next in between
- lock.lock()
- try {
- val i = backing.iterator()
- while (i.hasNext) {
- if (i.next eq target) {
- i.remove()
- notFull.signal()
- return ()
- }
- }
- } finally {
- lock.unlock()
- }
- }
- }
- } finally {
- lock.unlock
- }
- }
-
- override def toArray(): Array[AnyRef] = {
- lock.lock()
- try {
- backing.toArray
- } finally {
- lock.unlock()
- }
- }
-
- override def isEmpty(): Boolean = {
- lock.lock()
- try {
- backing.isEmpty()
- } finally {
- lock.unlock()
- }
- }
-
- override def toArray[X](a: Array[X with AnyRef]) = {
- lock.lock()
- try {
- backing.toArray[X](a)
- } finally {
- lock.unlock()
- }
- }
-}