aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-21 14:30:00 -0700
committerYin Huai <yhuai@databricks.com>2015-08-21 14:30:00 -0700
commite3355090d4030daffed5efb0959bf1d724c13c13 (patch)
treee0426a0d44f23bb84cab25bcf1d369564c303f97 /sql
parentf5b028ed2f1ad6de43c8b50ebf480e1b6c047035 (diff)
downloadspark-e3355090d4030daffed5efb0959bf1d724c13c13.tar.gz
spark-e3355090d4030daffed5efb0959bf1d724c13c13.tar.bz2
spark-e3355090d4030daffed5efb0959bf1d724c13c13.zip
[SPARK-10143] [SQL] Use parquet's block size (row group size) setting as the min split size if necessary.
https://issues.apache.org/jira/browse/SPARK-10143 With this PR, we will set min split size to parquet's block size (row group size) set in the conf if the min split size is smaller. So, we can avoid have too many tasks and even useless tasks for reading parquet data. I tested it locally. The table I have has 343MB and it is in my local FS. Because I did not set any min/max split size, the default split size was 32MB and the map stage had 11 tasks. But there were only three tasks that actually read data. With my PR, there were only three tasks in the map stage. Here is the difference. Without this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399179/8587dba6-4765-11e5-9189-7ebba52a2b6d.png) With this PR: ![image](https://cloud.githubusercontent.com/assets/2072857/9399185/a4735d74-4765-11e5-8848-1f1e361a6b4b.png) Even if the block size setting does match the actual block size of parquet file, I think it is still generally good to use parquet's block size setting if min split size is smaller than this block size. Tested it on a cluster using ``` val count = sqlContext.table("""store_sales""").groupBy().count().queryExecution.executedPlan(3).execute().count ``` Basically, it reads 0 column of table `store_sales`. My table has 1824 parquet files with size from 80MB to 280MB (1 to 3 row group sizes). Without this patch, in a 16 worker cluster, the job had 5023 tasks and spent 102s. With this patch, the job had 2893 tasks and spent 64s. It is still not as good as using one mapper per file (1824 tasks and 42s), but it is much better than our master. Author: Yin Huai <yhuai@databricks.com> Closes #8346 from yhuai/parquetMinSplit.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala41
1 files changed, 39 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 68169d48ac..bbf682aec0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.util.{Failure, Try}
import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -281,12 +282,18 @@ private[sql] class ParquetRelation(
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
+ // Parquet row group size. We will use this value as the value for
+ // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
+ // of these flags are smaller than the parquet row group size.
+ val parquetBlockSize = ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
+
// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
ParquetRelation.initializeLocalJobFunc(
requiredColumns,
filters,
dataSchema,
+ parquetBlockSize,
useMetadataCache,
parquetFilterPushDown,
assumeBinaryIsString,
@@ -294,7 +301,8 @@ private[sql] class ParquetRelation(
followParquetFormatSpec) _
// Create the function to set input paths at the driver side.
- val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
+ val setInputPaths =
+ ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
Utils.withDummyCallSite(sqlContext.sparkContext) {
new SqlNewHadoopRDD(
@@ -482,11 +490,35 @@ private[sql] object ParquetRelation extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+ /**
+ * If parquet's block size (row group size) setting is larger than the min split size,
+ * we use parquet's block size setting as the min split size. Otherwise, we will create
+ * tasks processing nothing (because a split does not cover the starting point of a
+ * parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
+ */
+ private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
+ val minSplitSize =
+ math.max(
+ conf.getLong("mapred.min.split.size", 0L),
+ conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
+ if (parquetBlockSize > minSplitSize) {
+ val message =
+ s"Parquet's block size (row group size) is larger than " +
+ s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
+ s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
+ s"$parquetBlockSize."
+ logDebug(message)
+ conf.set("mapred.min.split.size", parquetBlockSize.toString)
+ conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
+ }
+ }
+
/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
+ parquetBlockSize: Long,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
@@ -522,16 +554,21 @@ private[sql] object ParquetRelation extends Logging {
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
+
+ overrideMinSplitSize(parquetBlockSize, conf)
}
/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
- inputFiles: Array[FileStatus])(job: Job): Unit = {
+ inputFiles: Array[FileStatus],
+ parquetBlockSize: Long)(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
+
+ overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}
private[parquet] def readSchema(