diff options
author | zsxwing <zsxwing@gmail.com> | 2014-12-30 14:39:13 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-12-30 14:39:13 -0800 |
commit | 6a897829444e2ef273586511f93a40d36e64fb0b (patch) | |
tree | d48ba53b27c79f0abb7816aeb7682250521c7ebb /streaming | |
parent | 0f31992c61f6662e5347745f6a1ac272a5fd63c9 (diff) | |
download | spark-6a897829444e2ef273586511f93a40d36e64fb0b.tar.gz spark-6a897829444e2ef273586511f93a40d36e64fb0b.tar.bz2 spark-6a897829444e2ef273586511f93a40d36e64fb0b.zip |
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout.
Author: zsxwing <zsxwing@gmail.com>
Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits:
52247f5 [zsxwing] Add explicit unit type
be42bcf [zsxwing] Update as per review suggestion
e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala | 63 |
1 files changed, 48 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index a0aeacbc73..fdbbe2aa6e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -17,30 +17,63 @@ package org.apache.spark.streaming +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + private[streaming] class ContextWaiter { + + private val lock = new ReentrantLock() + private val condition = lock.newCondition() + + // Guarded by "lock" private var error: Throwable = null - private var stopped: Boolean = false - def notifyError(e: Throwable) = synchronized { - error = e - notifyAll() - } + // Guarded by "lock" + private var stopped: Boolean = false - def notifyStop() = synchronized { - stopped = true - notifyAll() + def notifyError(e: Throwable): Unit = { + lock.lock() + try { + error = e + condition.signalAll() + } finally { + lock.unlock() + } } - def waitForStopOrError(timeout: Long = -1) = synchronized { - // If already had error, then throw it - if (error != null) { - throw error + def notifyStop(): Unit = { + lock.lock() + try { + stopped = true + condition.signalAll() + } finally { + lock.unlock() } + } - // If not already stopped, then wait - if (!stopped) { - if (timeout < 0) wait() else wait(timeout) + /** + * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or + * `false` if the waiting time detectably elapsed before return from the method. + */ + def waitForStopOrError(timeout: Long = -1): Boolean = { + lock.lock() + try { + if (timeout < 0) { + while (!stopped && error == null) { + condition.await() + } + } else { + var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) + while (!stopped && error == null && nanos > 0) { + nanos = condition.awaitNanos(nanos) + } + } + // If already had error, then throw it if (error != null) throw error + // already stopped or timeout + stopped + } finally { + lock.unlock() } } } |