diff options
author | gatorsmile <gatorsmile@gmail.com> | 2017-01-13 13:05:53 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-13 13:05:53 +0800 |
commit | 3356b8b6a9184fcab8d0fe993f3545c3beaa4d99 (patch) | |
tree | d61a3ab17d0d263d8f3c7bf7200d6a5ab9bcda5e /sql/core/src | |
parent | c983267b0853f908d1c671cedd18b159e6993df1 (diff) | |
download | spark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.tar.gz spark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.tar.bz2 spark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.zip |
[SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files
### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.
The related PR: https://github.com/apache/spark/pull/16090
### How was this patch tested?
Updated the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16481 from gatorsmile/saveFileScan.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala | 2 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 172 |
2 files changed, 99 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 73b21533a2..90aeebd932 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.write(mode, Dataset.ofRows(session, query)) + dataSource.writeAndRead(mode, Dataset.ofRows(session, query)) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b7f3559b65..29afe5751b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -413,10 +413,85 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ - def write( - mode: SaveMode, - data: DataFrame): BaseRelation = { + /** + * Writes the given [[DataFrame]] out in this [[FileFormat]]. + */ + private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { + // Don't glob path for the write path. The contracts here are: + // 1. Only one output path can be specified on the write path; + // 2. Output path must be a legal HDFS style file system path; + // 3. It's OK that the output path doesn't exist yet; + val allPaths = paths ++ caseInsensitiveOptions.get("path") + val outputPath = if (allPaths.length == 1) { + val path = new Path(allPaths.head) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } else { + throw new IllegalArgumentException("Expected exactly one path to be specified, but " + + s"got: ${allPaths.mkString(", ")}") + } + + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) + + // If we are appending to a table that already exists, make sure the partitioning matches + // up. If we fail to load the table for whatever reason, ignore the check. + if (mode == SaveMode.Append) { + val existingPartitionColumns = Try { + getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList + }.getOrElse(Seq.empty[String]) + // TODO: Case sensitivity. + val sameColumns = + existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) + if (existingPartitionColumns.nonEmpty && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingPartitionColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) + } + } + + // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does + // not need to have the query as child, to avoid to analyze an optimized query, + // because InsertIntoHadoopFsRelationCommand will be optimized first. + val partitionAttributes = partitionColumns.map { name => + val plan = data.logicalPlan + plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { + throw new AnalysisException( + s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] + } + val fileIndex = catalogTable.map(_.identifier).map { tableIdent => + sparkSession.table(tableIdent).queryExecution.analyzed.collect { + case LogicalRelation(t: HadoopFsRelation, _, _) => t.location + }.head + } + // For partitioned relation r, r.schema's column ordering can be different from the column + // ordering of data.logicalPlan (partition columns are all moved after data column). This + // will be adjusted within InsertIntoHadoopFsRelation. + val plan = + InsertIntoHadoopFsRelationCommand( + outputPath = outputPath, + staticPartitions = Map.empty, + partitionColumns = partitionAttributes, + bucketSpec = bucketSpec, + fileFormat = format, + options = options, + query = data.logicalPlan, + mode = mode, + catalogTable = catalogTable, + fileIndex = fileIndex) + sparkSession.sessionState.executePlan(plan).toRdd + } + + /** + * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for + * the following reading. + */ + def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = { if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } @@ -425,78 +500,27 @@ case class DataSource( case dataSource: CreatableRelationProvider => dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) case format: FileFormat => - // Don't glob path for the write path. The contracts here are: - // 1. Only one output path can be specified on the write path; - // 2. Output path must be a legal HDFS style file system path; - // 3. It's OK that the output path doesn't exist yet; - val allPaths = paths ++ caseInsensitiveOptions.get("path") - val outputPath = if (allPaths.length == 1) { - val path = new Path(allPaths.head) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) - } else { - throw new IllegalArgumentException("Expected exactly one path to be specified, but " + - s"got: ${allPaths.mkString(", ")}") - } - - val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - PartitioningUtils.validatePartitionColumn( - data.schema, partitionColumns, caseSensitive) - - // If we are appending to a table that already exists, make sure the partitioning matches - // up. If we fail to load the table for whatever reason, ignore the check. - if (mode == SaveMode.Append) { - val existingPartitionColumns = Try { - getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList - }.getOrElse(Seq.empty[String]) - // TODO: Case sensitivity. - val sameColumns = - existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) - if (existingPartitionColumns.nonEmpty && !sameColumns) { - throw new AnalysisException( - s"""Requested partitioning does not match existing partitioning. - |Existing partitioning columns: - | ${existingPartitionColumns.mkString(", ")} - |Requested partitioning columns: - | ${partitionColumns.mkString(", ")} - |""".stripMargin) - } - } - - // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does - // not need to have the query as child, to avoid to analyze an optimized query, - // because InsertIntoHadoopFsRelationCommand will be optimized first. - val partitionAttributes = partitionColumns.map { name => - val plan = data.logicalPlan - plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse { - throw new AnalysisException( - s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]") - }.asInstanceOf[Attribute] - } - val fileIndex = catalogTable.map(_.identifier).map { tableIdent => - sparkSession.table(tableIdent).queryExecution.analyzed.collect { - case LogicalRelation(t: HadoopFsRelation, _, _) => t.location - }.head - } - // For partitioned relation r, r.schema's column ordering can be different from the column - // ordering of data.logicalPlan (partition columns are all moved after data column). This - // will be adjusted within InsertIntoHadoopFsRelation. - val plan = - InsertIntoHadoopFsRelationCommand( - outputPath = outputPath, - staticPartitions = Map.empty, - partitionColumns = partitionAttributes, - bucketSpec = bucketSpec, - fileFormat = format, - options = options, - query = data.logicalPlan, - mode = mode, - catalogTable = catalogTable, - fileIndex = fileIndex) - sparkSession.sessionState.executePlan(plan).toRdd - // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. + writeInFileFormat(format, mode, data) + // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() + case _ => + sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") + } + } + /** + * Writes the given [[DataFrame]] out to this [[DataSource]]. + */ + def write(mode: SaveMode, data: DataFrame): Unit = { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + + providingClass.newInstance() match { + case dataSource: CreatableRelationProvider => + dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) + case format: FileFormat => + writeInFileFormat(format, mode, data) case _ => sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } |