summaryrefslogtreecommitdiff
path: root/test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
diff options
context:
space:
mode:
Diffstat (limited to 'test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala')
-rw-r--r--test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala326
1 files changed, 326 insertions, 0 deletions
diff --git a/test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala b/test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
new file mode 100644
index 0000000000..f8deda746c
--- /dev/null
+++ b/test/files/presentation/akka/src/akka/util/BoundedBlockingQueue.scala
@@ -0,0 +1,326 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.util
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{ TimeUnit, BlockingQueue }
+import java.util.{ AbstractQueue, Queue, Collection, Iterator }
+
+class BoundedBlockingQueue[E <: AnyRef](
+ val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
+
+ backing match {
+ case null => throw new IllegalArgumentException("Backing Queue may not be null")
+ case b: BlockingQueue[_] =>
+ require(maxCapacity > 0)
+ require(b.size() == 0)
+ require(b.remainingCapacity >= maxCapacity)
+ case b: Queue[_] =>
+ require(b.size() == 0)
+ require(maxCapacity > 0)
+ }
+
+ protected val lock = new ReentrantLock(false)
+
+ private val notEmpty = lock.newCondition()
+ private val notFull = lock.newCondition()
+
+ def put(e: E): Unit = { //Blocks until not full
+ if (e eq null) throw new NullPointerException
+ lock.lock()
+ try {
+ while (backing.size() == maxCapacity)
+ notFull.await()
+ require(backing.offer(e))
+ notEmpty.signal()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def take(): E = { //Blocks until not empty
+ lock.lockInterruptibly()
+ try {
+ while (backing.size() == 0)
+ notEmpty.await()
+ val e = backing.poll()
+ require(e ne null)
+ notFull.signal()
+ e
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
+ if (e eq null) throw new NullPointerException
+ lock.lock()
+ try {
+ if (backing.size() == maxCapacity) false
+ else {
+ require(backing.offer(e)) //Should never fail
+ notEmpty.signal()
+ true
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
+ if (e eq null) throw new NullPointerException
+ var nanos = unit.toNanos(timeout)
+ lock.lockInterruptibly()
+ try {
+ while (backing.size() == maxCapacity) {
+ if (nanos <= 0)
+ return false
+ else
+ nanos = notFull.awaitNanos(nanos)
+ }
+ require(backing.offer(e)) //Should never fail
+ notEmpty.signal()
+ true
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
+ var nanos = unit.toNanos(timeout)
+ lock.lockInterruptibly()
+ try {
+ var result: E = null.asInstanceOf[E]
+ var hasResult = false
+ while (!hasResult) {
+ hasResult = backing.poll() match {
+ case null if nanos <= 0 =>
+ result = null.asInstanceOf[E]
+ true
+ case null =>
+ try {
+ nanos = notEmpty.awaitNanos(nanos)
+ } catch {
+ case ie: InterruptedException =>
+ notEmpty.signal()
+ throw ie
+ }
+ false
+ case e =>
+ notFull.signal()
+ result = e
+ true
+ }
+ }
+ result
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
+ lock.lock()
+ try {
+ backing.poll() match {
+ case null => null.asInstanceOf[E]
+ case e =>
+ notFull.signal()
+ e
+ }
+ } finally {
+ lock.unlock
+ }
+ }
+
+ override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
+ if (e eq null) throw new NullPointerException
+ lock.lock()
+ try {
+ if (backing remove e) {
+ notFull.signal()
+ true
+ } else false
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def contains(e: AnyRef): Boolean = {
+ if (e eq null) throw new NullPointerException
+ lock.lock()
+ try {
+ backing contains e
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def clear(): Unit = {
+ lock.lock()
+ try {
+ backing.clear
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def remainingCapacity(): Int = {
+ lock.lock()
+ try {
+ maxCapacity - backing.size()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def size(): Int = {
+ lock.lock()
+ try {
+ backing.size()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def peek(): E = {
+ lock.lock()
+ try {
+ backing.peek()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
+
+ def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
+ if (c eq null) throw new NullPointerException
+ if (c eq this) throw new IllegalArgumentException
+ if (maxElements <= 0) 0
+ else {
+ lock.lock()
+ try {
+ var n = 0
+ var e: E = null.asInstanceOf[E]
+ while (n < maxElements) {
+ backing.poll() match {
+ case null => return n
+ case e =>
+ c add e
+ n += 1
+ }
+ }
+ n
+ } finally {
+ lock.unlock()
+ }
+ }
+ }
+
+ override def containsAll(c: Collection[_]): Boolean = {
+ lock.lock()
+ try {
+ backing containsAll c
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def removeAll(c: Collection[_]): Boolean = {
+ lock.lock()
+ try {
+ if (backing.removeAll(c)) {
+ val sz = backing.size()
+ if (sz < maxCapacity) notFull.signal()
+ if (sz > 0) notEmpty.signal() //FIXME needed?
+ true
+ } else false
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def retainAll(c: Collection[_]): Boolean = {
+ lock.lock()
+ try {
+ if (backing.retainAll(c)) {
+ val sz = backing.size()
+ if (sz < maxCapacity) notFull.signal() //FIXME needed?
+ if (sz > 0) notEmpty.signal()
+ true
+ } else false
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def iterator(): Iterator[E] = {
+ lock.lock
+ try {
+ val elements = backing.toArray
+ new Iterator[E] {
+ var at = 0
+ var last = -1
+
+ def hasNext(): Boolean = at < elements.length
+
+ def next(): E = {
+ if (at >= elements.length) throw new NoSuchElementException
+ last = at
+ at += 1
+ elements(last).asInstanceOf[E]
+ }
+
+ def remove(): Unit = {
+ if (last < 0) throw new IllegalStateException
+ val target = elements(last)
+ last = -1 //To avoid 2 subsequent removes without a next in between
+ lock.lock()
+ try {
+ val i = backing.iterator()
+ while (i.hasNext) {
+ if (i.next eq target) {
+ i.remove()
+ notFull.signal()
+ return ()
+ }
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
+ }
+ } finally {
+ lock.unlock
+ }
+ }
+
+ override def toArray(): Array[AnyRef] = {
+ lock.lock()
+ try {
+ backing.toArray
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def isEmpty(): Boolean = {
+ lock.lock()
+ try {
+ backing.isEmpty()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ override def toArray[X](a: Array[X with AnyRef]) = {
+ lock.lock()
+ try {
+ backing.toArray[X](a)
+ } finally {
+ lock.unlock()
+ }
+ }
+}