From 6a897829444e2ef273586511f93a40d36e64fb0b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 30 Dec 2014 14:39:13 -0800 Subject: [SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' --- .../org/apache/spark/streaming/ContextWaiter.scala | 63 ++++++++++++++++------ 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index a0aeacbc73..fdbbe2aa6e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -17,30 +17,63 @@ package org.apache.spark.streaming +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + private[streaming] class ContextWaiter { + + private val lock = new ReentrantLock() + private val condition = lock.newCondition() + + // Guarded by "lock" private var error: Throwable = null - private var stopped: Boolean = false - def notifyError(e: Throwable) = synchronized { - error = e - notifyAll() - } + // Guarded by "lock" + private var stopped: Boolean = false - def notifyStop() = synchronized { - stopped = true - notifyAll() + def notifyError(e: Throwable): Unit = { + lock.lock() + try { + error = e + condition.signalAll() + } finally { + lock.unlock() + } } - def waitForStopOrError(timeout: Long = -1) = synchronized { - // If already had error, then throw it - if (error != null) { - throw error + def notifyStop(): Unit = { + lock.lock() + try { + stopped = true + condition.signalAll() + } finally { + lock.unlock() } + } - // If not already stopped, then wait - if (!stopped) { - if (timeout < 0) wait() else wait(timeout) + /** + * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or + * `false` if the waiting time detectably elapsed before return from the method. + */ + def waitForStopOrError(timeout: Long = -1): Boolean = { + lock.lock() + try { + if (timeout < 0) { + while (!stopped && error == null) { + condition.await() + } + } else { + var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) + while (!stopped && error == null && nanos > 0) { + nanos = condition.awaitNanos(nanos) + } + } + // If already had error, then throw it if (error != null) throw error + // already stopped or timeout + stopped + } finally { + lock.unlock() } } } -- cgit v1.2.3