aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-21 12:42:49 -0700
committerYin Huai <yhuai@databricks.com>2016-06-21 12:42:49 -0700
commitc399c7f0e485dcfc6cbc343bc246b8adc3f0648c (patch)
tree09bdd5b14c14a4e2bbd7a90f7e772d9e1a867a4b /sql
parentf4a3d45e38f18278bbdb7cc32486ded50f76d54b (diff)
downloadspark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.tar.gz
spark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.tar.bz2
spark-c399c7f0e485dcfc6cbc343bc246b8adc3f0648c.zip
[SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage
## What changes were proposed in this pull request? Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13718 from zsxwing/SPARK-16002.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala5
6 files changed, 27 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index d96cf1bf07..f713fdec4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -82,7 +82,7 @@ class ListingFileCatalog(
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
- logInfo(s"Listing $path on driver")
+ logTrace(s"Listing $path on driver")
Try {
HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
}.getOrElse(Array.empty[FileStatus])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 4ac555be7f..521eb7ffcc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -389,7 +389,7 @@ private[sql] object HadoopFsRelation extends Logging {
// tasks/jobs may leave partial/corrupted data files there. Files and directories whose name
// start with "." are also ignored.
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
- logInfo(s"Listing ${status.getPath}")
+ logTrace(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 9886ad0b41..11bf3c0bd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -120,7 +120,13 @@ class FileStreamSource(
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().map(_.getPath.toUri.toString)
val endTime = System.nanoTime
- logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+ val listingTimeMs = (endTime.toDouble - startTime) / 1000000
+ if (listingTimeMs > 2000) {
+ // Output a warning when listing files uses more than 2 seconds.
+ logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
+ } else {
+ logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms")
+ }
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
files
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bb42a11759..1428b97149 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -56,6 +57,8 @@ class StreamExecution(
import org.apache.spark.sql.streaming.StreamingQueryListener._
+ private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
+
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
@@ -190,6 +193,8 @@ class StreamExecution(
runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
+ } else {
+ Thread.sleep(pollingDelayMs)
}
true
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4b8916f59c..1a9bb6a0b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -534,7 +534,7 @@ object SQLConf {
val FILE_SINK_LOG_CLEANUP_DELAY =
SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
.internal()
- .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
+ .doc("How long that a file is guaranteed to be visible for all readers.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(60 * 1000L) // 10 minutes
@@ -545,6 +545,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val STREAMING_POLLING_DELAY =
+ SQLConfigBuilder("spark.sql.streaming.pollingDelay")
+ .internal()
+ .doc("How long to delay polling new data when no data is available")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(10L)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 720ffaf732..f9496520f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -326,6 +326,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
"can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
+ val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
+ // Make sure we don't advance ManualClock too early. See SPARK-16002.
+ eventually("ManualClock has not yet entered the waiting state") {
+ assert(clock.isWaiting)
+ }
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
case StopStream =>