aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-12-30 14:39:13 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-30 14:39:36 -0800
commitedc96d81df66c5cb36e13fe93ab47b66a0a8a02b (patch)
tree5de28ec14cc29da7c773792a8db61172943144e0 /streaming
parent7a245412f7b1337c766981f43bcbb64890439002 (diff)
downloadspark-edc96d81df66c5cb36e13fe93ab47b66a0a8a02b.tar.gz
spark-edc96d81df66c5cb36e13fe93ab47b66a0a8a02b.tar.bz2
spark-edc96d81df66c5cb36e13fe93ab47b66a0a8a02b.zip
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <zsxwing@gmail.com> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a897829444e2ef273586511f93a40d36e64fb0b) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala63
1 files changed, 48 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
index a0aeacbc73..fdbbe2aa6e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
@@ -17,30 +17,63 @@
package org.apache.spark.streaming
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
private[streaming] class ContextWaiter {
+
+ private val lock = new ReentrantLock()
+ private val condition = lock.newCondition()
+
+ // Guarded by "lock"
private var error: Throwable = null
- private var stopped: Boolean = false
- def notifyError(e: Throwable) = synchronized {
- error = e
- notifyAll()
- }
+ // Guarded by "lock"
+ private var stopped: Boolean = false
- def notifyStop() = synchronized {
- stopped = true
- notifyAll()
+ def notifyError(e: Throwable): Unit = {
+ lock.lock()
+ try {
+ error = e
+ condition.signalAll()
+ } finally {
+ lock.unlock()
+ }
}
- def waitForStopOrError(timeout: Long = -1) = synchronized {
- // If already had error, then throw it
- if (error != null) {
- throw error
+ def notifyStop(): Unit = {
+ lock.lock()
+ try {
+ stopped = true
+ condition.signalAll()
+ } finally {
+ lock.unlock()
}
+ }
- // If not already stopped, then wait
- if (!stopped) {
- if (timeout < 0) wait() else wait(timeout)
+ /**
+ * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
+ * `false` if the waiting time detectably elapsed before return from the method.
+ */
+ def waitForStopOrError(timeout: Long = -1): Boolean = {
+ lock.lock()
+ try {
+ if (timeout < 0) {
+ while (!stopped && error == null) {
+ condition.await()
+ }
+ } else {
+ var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
+ while (!stopped && error == null && nanos > 0) {
+ nanos = condition.awaitNanos(nanos)
+ }
+ }
+ // If already had error, then throw it
if (error != null) throw error
+ // already stopped or timeout
+ stopped
+ } finally {
+ lock.unlock()
}
}
}