aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-01-13 13:05:53 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-13 13:05:53 +0800
commit3356b8b6a9184fcab8d0fe993f3545c3beaa4d99 (patch)
treed61a3ab17d0d263d8f3c7bf7200d6a5ab9bcda5e /sql/core/src
parentc983267b0853f908d1c671cedd18b159e6993df1 (diff)
downloadspark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.tar.gz
spark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.tar.bz2
spark-3356b8b6a9184fcab8d0fe993f3545c3beaa4d99.zip
[SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files
### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: https://github.com/apache/spark/pull/16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #16481 from gatorsmile/saveFileScan.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala172
2 files changed, 99 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 73b21533a2..90aeebd932 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)
try {
- dataSource.write(mode, Dataset.ofRows(session, query))
+ dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b7f3559b65..29afe5751b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -413,10 +413,85 @@ case class DataSource(
relation
}
- /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
- def write(
- mode: SaveMode,
- data: DataFrame): BaseRelation = {
+ /**
+ * Writes the given [[DataFrame]] out in this [[FileFormat]].
+ */
+ private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
+ // Don't glob path for the write path. The contracts here are:
+ // 1. Only one output path can be specified on the write path;
+ // 2. Output path must be a legal HDFS style file system path;
+ // 3. It's OK that the output path doesn't exist yet;
+ val allPaths = paths ++ caseInsensitiveOptions.get("path")
+ val outputPath = if (allPaths.length == 1) {
+ val path = new Path(allPaths.head)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ } else {
+ throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
+ s"got: ${allPaths.mkString(", ")}")
+ }
+
+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
+
+ // If we are appending to a table that already exists, make sure the partitioning matches
+ // up. If we fail to load the table for whatever reason, ignore the check.
+ if (mode == SaveMode.Append) {
+ val existingPartitionColumns = Try {
+ getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
+ }.getOrElse(Seq.empty[String])
+ // TODO: Case sensitivity.
+ val sameColumns =
+ existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
+ if (existingPartitionColumns.nonEmpty && !sameColumns) {
+ throw new AnalysisException(
+ s"""Requested partitioning does not match existing partitioning.
+ |Existing partitioning columns:
+ | ${existingPartitionColumns.mkString(", ")}
+ |Requested partitioning columns:
+ | ${partitionColumns.mkString(", ")}
+ |""".stripMargin)
+ }
+ }
+
+ // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+ // not need to have the query as child, to avoid to analyze an optimized query,
+ // because InsertIntoHadoopFsRelationCommand will be optimized first.
+ val partitionAttributes = partitionColumns.map { name =>
+ val plan = data.logicalPlan
+ plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
+ }
+ val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
+ sparkSession.table(tableIdent).queryExecution.analyzed.collect {
+ case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
+ }.head
+ }
+ // For partitioned relation r, r.schema's column ordering can be different from the column
+ // ordering of data.logicalPlan (partition columns are all moved after data column). This
+ // will be adjusted within InsertIntoHadoopFsRelation.
+ val plan =
+ InsertIntoHadoopFsRelationCommand(
+ outputPath = outputPath,
+ staticPartitions = Map.empty,
+ partitionColumns = partitionAttributes,
+ bucketSpec = bucketSpec,
+ fileFormat = format,
+ options = options,
+ query = data.logicalPlan,
+ mode = mode,
+ catalogTable = catalogTable,
+ fileIndex = fileIndex)
+ sparkSession.sessionState.executePlan(plan).toRdd
+ }
+
+ /**
+ * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
+ * the following reading.
+ */
+ def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
@@ -425,78 +500,27 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
- // Don't glob path for the write path. The contracts here are:
- // 1. Only one output path can be specified on the write path;
- // 2. Output path must be a legal HDFS style file system path;
- // 3. It's OK that the output path doesn't exist yet;
- val allPaths = paths ++ caseInsensitiveOptions.get("path")
- val outputPath = if (allPaths.length == 1) {
- val path = new Path(allPaths.head)
- val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
- path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- } else {
- throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
- s"got: ${allPaths.mkString(", ")}")
- }
-
- val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
- PartitioningUtils.validatePartitionColumn(
- data.schema, partitionColumns, caseSensitive)
-
- // If we are appending to a table that already exists, make sure the partitioning matches
- // up. If we fail to load the table for whatever reason, ignore the check.
- if (mode == SaveMode.Append) {
- val existingPartitionColumns = Try {
- getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
- }.getOrElse(Seq.empty[String])
- // TODO: Case sensitivity.
- val sameColumns =
- existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
- if (existingPartitionColumns.nonEmpty && !sameColumns) {
- throw new AnalysisException(
- s"""Requested partitioning does not match existing partitioning.
- |Existing partitioning columns:
- | ${existingPartitionColumns.mkString(", ")}
- |Requested partitioning columns:
- | ${partitionColumns.mkString(", ")}
- |""".stripMargin)
- }
- }
-
- // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
- // not need to have the query as child, to avoid to analyze an optimized query,
- // because InsertIntoHadoopFsRelationCommand will be optimized first.
- val partitionAttributes = partitionColumns.map { name =>
- val plan = data.logicalPlan
- plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
- throw new AnalysisException(
- s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
- }.asInstanceOf[Attribute]
- }
- val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
- sparkSession.table(tableIdent).queryExecution.analyzed.collect {
- case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
- }.head
- }
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). This
- // will be adjusted within InsertIntoHadoopFsRelation.
- val plan =
- InsertIntoHadoopFsRelationCommand(
- outputPath = outputPath,
- staticPartitions = Map.empty,
- partitionColumns = partitionAttributes,
- bucketSpec = bucketSpec,
- fileFormat = format,
- options = options,
- query = data.logicalPlan,
- mode = mode,
- catalogTable = catalogTable,
- fileIndex = fileIndex)
- sparkSession.sessionState.executePlan(plan).toRdd
- // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
+ writeInFileFormat(format, mode, data)
+ // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
+ case _ =>
+ sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
+ }
+ }
+ /**
+ * Writes the given [[DataFrame]] out to this [[DataSource]].
+ */
+ def write(mode: SaveMode, data: DataFrame): Unit = {
+ if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+ throw new AnalysisException("Cannot save interval data type into external storage.")
+ }
+
+ providingClass.newInstance() match {
+ case dataSource: CreatableRelationProvider =>
+ dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
+ case format: FileFormat =>
+ writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}