diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-06-21 12:42:49 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-21 12:42:49 -0700 |
commit | c399c7f0e485dcfc6cbc343bc246b8adc3f0648c (patch) | |
tree | 09bdd5b14c14a4e2bbd7a90f7e772d9e1a867a4b /core/src | |
parent | f4a3d45e38f18278bbdb7cc32486ded50f76d54b (diff) | |
download | spark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.tar.gz spark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.tar.bz2 spark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.zip |
[SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage
## What changes were proposed in this pull request?
Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs.
## How was this patch tested?
Existing unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13718 from zsxwing/SPARK-16002.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/ManualClock.scala | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index e7a65d74a4..91a9587101 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,6 +26,8 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { + private var _isWaiting = false + /** * @return `ManualClock` with initial time 0 */ @@ -57,9 +59,19 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - while (time < targetTime) { - wait(10) + _isWaiting = true + try { + while (time < targetTime) { + wait(10) + } + getTimeMillis() + } finally { + _isWaiting = false } - getTimeMillis() } + + /** + * Returns whether there is any thread being blocked in `waitTillTime`. + */ + def isWaiting: Boolean = synchronized { _isWaiting } } |