From c399c7f0e485dcfc6cbc343bc246b8adc3f0648c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 21 Jun 2016 12:42:49 -0700 Subject: [SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage ## What changes were proposed in this pull request? Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu Closes #13718 from zsxwing/SPARK-16002. --- .../spark/sql/execution/datasources/ListingFileCatalog.scala | 2 +- .../spark/sql/execution/datasources/fileSourceInterfaces.scala | 2 +- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 8 +++++++- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 5 +++++ .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 9 ++++++++- .../test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 5 +++++ 6 files changed, 27 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index d96cf1bf07..f713fdec4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -82,7 +82,7 @@ class ListingFileCatalog( val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) - logInfo(s"Listing $path on driver") + logTrace(s"Listing $path on driver") Try { HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) }.getOrElse(Array.empty[FileStatus]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 4ac555be7f..521eb7ffcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -389,7 +389,7 @@ private[sql] object HadoopFsRelation extends Logging { // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name // start with "." are also ignored. def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = { - logInfo(s"Listing ${status.getPath}") + logTrace(s"Listing ${status.getPath}") val name = status.getPath.getName.toLowerCase if (shouldFilterOut(name)) { Array.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 9886ad0b41..11bf3c0bd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -120,7 +120,13 @@ class FileStreamSource( val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().map(_.getPath.toUri.toString) val endTime = System.nanoTime - logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") + val listingTimeMs = (endTime.toDouble - startTime) / 1000000 + if (listingTimeMs > 2000) { + // Output a warning when listing files uses more than 2 seconds. + logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms") + } else { + logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms") + } logTrace(s"Files are:\n\t" + files.mkString("\n\t")) files } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bb42a11759..1428b97149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -56,6 +57,8 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ + private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY) + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -190,6 +193,8 @@ class StreamExecution( runBatch() // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 + } else { + Thread.sleep(pollingDelayMs) } true } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4b8916f59c..1a9bb6a0b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -534,7 +534,7 @@ object SQLConf { val FILE_SINK_LOG_CLEANUP_DELAY = SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") .internal() - .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(60 * 1000L) // 10 minutes @@ -545,6 +545,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val STREAMING_POLLING_DELAY = + SQLConfigBuilder("spark.sql.streaming.pollingDelay") + .internal() + .doc("How long to delay polling new data when no data is available") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(10L) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 720ffaf732..f9496520f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -326,6 +326,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { "can not advance manual clock when a stream is not running") verify(currentStream.triggerClock.isInstanceOf[ManualClock], s"can not advance clock of type ${currentStream.triggerClock.getClass}") + val clock = currentStream.triggerClock.asInstanceOf[ManualClock] + // Make sure we don't advance ManualClock too early. See SPARK-16002. + eventually("ManualClock has not yet entered the waiting state") { + assert(clock.isWaiting) + } currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) case StopStream => -- cgit v1.2.3