aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-21 14:33:11 -0700
committerYin Huai <yhuai@databricks.com>2015-05-21 14:33:11 -0700
commit5287eec5a6948c0c6e0baaebf35f512324c0679a (patch)
tree078d0af874331e537667fe5ad8ee5fa3ab6023bb /sql
parent6b18cdc1b1284b1d48d637d06a1e64829aeb6202 (diff)
downloadspark-5287eec5a6948c0c6e0baaebf35f512324c0679a.tar.gz
spark-5287eec5a6948c0c6e0baaebf35f512324c0679a.tar.bz2
spark-5287eec5a6948c0c6e0baaebf35f512324c0679a.zip
[SPARK-7718] [SQL] Speed up partitioning by avoiding closure cleaning
According to yhuai we spent 6-7 seconds cleaning closures in a partitioning job that takes 12 seconds. Since we provide these closures in Spark we know for sure they are serializable, so we can bypass the cleaning. Author: Andrew Or <andrew@databricks.com> Closes #6256 from andrewor14/sql-partition-speed-up and squashes the following commits: a82b451 [Andrew Or] Fix style 10f7e3e [Andrew Or] Avoid getting call sites and cleaning closures 17e2943 [Andrew Or] Merge branch 'master' of github.com:apache/spark into sql-partition-speed-up 523f042 [Andrew Or] Skip unnecessary Utils.getCallSites too f7fe143 [Andrew Or] Avoid unnecessary closure cleaning
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala98
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala4
3 files changed, 65 insertions, 55 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 32986aa3ec..cb1e60883d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -33,6 +33,7 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
+import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
@@ -40,7 +41,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
+import org.apache.spark.util.Utils
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -264,57 +265,58 @@ private[sql] class ParquetRelation2(
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
- // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
- // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
- // footers. Especially when a global arbitrative schema (either from metastore or data source
- // DDL) is available.
- new SqlNewHadoopRDD(
- sc = sqlContext.sparkContext,
- broadcastedConf = broadcastedConf,
- initDriverSideJobFuncOpt = Some(setInputPaths),
- initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
- inputFormatClass = classOf[FilteringParquetRowInputFormat],
- keyClass = classOf[Void],
- valueClass = classOf[Row]) {
-
- val cacheMetadata = useMetadataCache
-
- @transient val cachedStatuses = inputFiles.map { f =>
- // In order to encode the authority of a Path containing special characters such as /,
- // we need to use the string returned by the URI of the path to create a new Path.
- val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
- new FileStatus(
- f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
- f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
- }.toSeq
-
- @transient val cachedFooters = footers.map { f =>
- // In order to encode the authority of a Path containing special characters such as /,
- // we need to use the string returned by the URI of the path to create a new Path.
- new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
- }.toSeq
-
- // Overridden so we can inject our own cached files statuses.
- override def getPartitions: Array[SparkPartition] = {
- val inputFormat = if (cacheMetadata) {
- new FilteringParquetRowInputFormat {
- override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-
- override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+ Utils.withDummyCallSite(sqlContext.sparkContext) {
+ // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+ // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
+ // and footers. Especially when a global arbitrative schema (either from metastore or data
+ // source DDL) is available.
+ new SqlNewHadoopRDD(
+ sc = sqlContext.sparkContext,
+ broadcastedConf = broadcastedConf,
+ initDriverSideJobFuncOpt = Some(setInputPaths),
+ initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+ inputFormatClass = classOf[FilteringParquetRowInputFormat],
+ keyClass = classOf[Void],
+ valueClass = classOf[Row]) {
+
+ val cacheMetadata = useMetadataCache
+
+ @transient val cachedStatuses = inputFiles.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+ new FileStatus(
+ f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+ f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+ }.toSeq
+
+ @transient val cachedFooters = footers.map { f =>
+ // In order to encode the authority of a Path containing special characters such as /,
+ // we need to use the string returned by the URI of the path to create a new Path.
+ new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+ }.toSeq
+
+ // Overridden so we can inject our own cached files statuses.
+ override def getPartitions: Array[SparkPartition] = {
+ val inputFormat = if (cacheMetadata) {
+ new FilteringParquetRowInputFormat {
+ override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+ override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+ }
+ } else {
+ new FilteringParquetRowInputFormat
}
- } else {
- new FilteringParquetRowInputFormat
- }
- val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
- val rawSplits = inputFormat.getSplits(jobContext)
+ val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+ val rawSplits = inputFormat.getSplits(jobContext)
- Array.tabulate[SparkPartition](rawSplits.size) { i =>
- new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ Array.tabulate[SparkPartition](rawSplits.size) { i =>
+ new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
}
- }
- }.values
+ }.values
+ }
}
private class MetadataCache {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 550090d22d..c03649d00b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.sources
-import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
+import org.apache.spark.util.Utils
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -197,7 +198,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}
}
- dataRows.mapPartitions { iterator =>
+ // Since we know for sure that this closure is serializable, we can avoid the overhead
+ // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
+ // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
+ val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => {
val dataTypes = requiredColumns.map(schema(_).dataType)
val mutableRow = new SpecificMutableRow(dataTypes)
iterator.map { dataRow =>
@@ -209,6 +213,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
mutableRow.asInstanceOf[expressions.Row]
}
}
+
+ // This is an internal RDD whose call site the user should not be concerned with
+ // Since we create many of these (one per partition), the time spent on computing
+ // the call site may add up.
+ Utils.withDummyCallSite(dataRows.sparkContext) {
+ new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
+ }
+
} else {
dataRows
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index 0c7bb6e50c..a74a98631d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil
with Logging {
- if (initLocalJobFuncOpt.isDefined) {
- sc.clean(initLocalJobFuncOpt.get)
- }
-
protected def getJob(): Job = {
val conf: Configuration = broadcastedConf.value.value
// "new Job" will make a copy of the conf. Then, it is