diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-13 01:32:28 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-05-13 01:32:28 +0800 |
commit | 0595b6de8f1da04baceda082553c2aa1aa2cb006 (patch) | |
tree | abfc400a323f64d324a5ea245294032433313001 /sql/hive/src | |
parent | 831504cf6bde6b1131005d5552e56a842725c84c (diff) | |
download | spark-0595b6de8f1da04baceda082553c2aa1aa2cb006.tar.gz spark-0595b6de8f1da04baceda082553c2aa1aa2cb006.tar.bz2 spark-0595b6de8f1da04baceda082553c2aa1aa2cb006.zip |
[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API
This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work.
## New features provided
1. Hive compatible partition discovery
This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.
1. Generalized partition pruning optimization
Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore.
(This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)
1. Insertion with dynamic partitions
When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.
## New structures provided
### Developer API
1. `FSBasedRelation`
Base abstract class for file system based data sources.
1. `OutputWriter`
Base abstract class for output row writers, responsible for writing a single row object.
1. `FSBasedRelationProvider`
A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.
### User API
New overloaded versions of
1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`
are provided to allow users to save/load DataFrames with user defined dynamic partition columns.
### Spark SQL query planning
1. `InsertIntoFSBasedRelation`
Used to implement write path for `FSBasedRelation`s.
1. New rules for `FSBasedRelation` in `DataSourceStrategy`
These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.
## TODO
- [ ] Use scratch directories when overwriting a table with data selected from itself.
Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.
- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.
This ensures that we only need to open a single `OutputWriter` at a time. For data sources like Parquet, `OutputWriter`s can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame.
- [x] More tests. Specifically, test cases for
- [x] Self-join
- [x] Loading partitioned relations with a subset of partition columns stored in data files.
- [x] `SQLContext.load()` with user defined dynamic partition columns.
## Parquet data source migration
Parquet data source migration is covered in PR https://github.com/liancheng/spark/pull/6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.
Author: Cheng Lian <lian@databricks.com>
Closes #5526 from liancheng/partitioning-support and squashes the following commits:
5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos. Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
Diffstat (limited to 'sql/hive/src')
6 files changed, 675 insertions, 23 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bbf48efb24..d754c8e3a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,25 +19,24 @@ package org.apache.spark.sql.hive import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} - import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.Warehouse +import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.Logging -import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} -import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Catalog, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} -import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -98,6 +97,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive ResolvedDataSource( hive, userSpecifiedSchema, + Array.empty[String], table.properties("spark.sql.sources.provider"), options) @@ -438,6 +438,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive desc.name, hive.conf.defaultDataSourceName, temporary = false, + Array.empty[String], mode, options = Map.empty[String, String], child diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index be9249a8b1..d46a127d47 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -221,14 +221,14 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case CreateTableUsing( - tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => + tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => ExecutedCommand( CreateMetastoreDataSource( tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil - case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) => + case CreateTableUsingAsSelect(tableName, provider, false, partitionCols, mode, opts, query) => val cmd = - CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query) + CreateMetastoreDataSourceAsSelect(tableName, provider, partitionCols, mode, opts, query) ExecutedCommand(cmd) :: Nil case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index abab1a223a..8e405e0804 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -158,6 +158,7 @@ private[hive] case class CreateMetastoreDataSourceAsSelect( tableName: String, provider: String, + partitionColumns: Array[String], mode: SaveMode, options: Map[String, String], query: LogicalPlan) extends RunnableCommand { @@ -189,12 +190,12 @@ case class CreateMetastoreDataSourceAsSelect( return Seq.empty[Row] case SaveMode.Append => // Check if the specified data source match the data source of the existing table. - val resolved = - ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath) + val resolved = ResolvedDataSource( + sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { - case l @ LogicalRelation(i: InsertableRelation) => - if (i != createdRelation.relation) { + case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) => + if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + s"match the existing relation of $tableName. " + @@ -202,14 +203,13 @@ case class CreateMetastoreDataSourceAsSelect( s"table $tableName and using its data source and options." val errorMessage = s""" - |$errorDescription - |== Relations == - |${sideBySide( - s"== Expected Relation ==" :: - l.toString :: Nil, - s"== Actual Relation ==" :: - createdRelation.toString :: Nil).mkString("\n")} - """.stripMargin + |$errorDescription + |== Relations == + |${sideBySide( + s"== Expected Relation ==" :: l.toString :: Nil, + s"== Actual Relation ==" :: createdRelation.toString :: Nil + ).mkString("\n")} + """.stripMargin throw new AnalysisException(errorMessage) } existingSchema = Some(l.schema) @@ -234,7 +234,8 @@ case class CreateMetastoreDataSourceAsSelect( } // Create the relation based on the data of df. - val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df) + val resolved = + ResolvedDataSource(sqlContext, provider, partitionColumns, mode, optionsWithPath, df) if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 8398da2681..cbc381cc81 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -204,7 +204,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( if (string == null || string.isEmpty) { defaultPartName } else { - FileUtils.escapePathName(string) + FileUtils.escapePathName(string, defaultPartName) } s"/$col=$colString" }.mkString diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala new file mode 100644 index 0000000000..415b1cd168 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetTest +import org.apache.spark.sql.types._ + +// TODO Don't extend ParquetTest +// This test suite extends ParquetTest for some convenient utility methods. These methods should be +// moved to some more general places, maybe QueryTest. +class FSBasedRelationSuite extends QueryTest with ParquetTest { + override val sqlContext: SQLContext = TestHive + + import sqlContext._ + import sqlContext.implicits._ + + val dataSchema = + StructType( + Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = false))) + + val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") + + val partitionedTestDF1 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF2 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) + + def checkQueries(df: DataFrame): Unit = { + // Selects everything + checkAnswer( + df, + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + + // Simple filtering and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 === 2), + for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) + + // Simple projection and filtering + checkAnswer( + df.filter('a > 1).select('b, 'a + 1), + for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) + + // Simple projection and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + + // Self-join + df.registerTempTable("t") + withTempTable("t") { + checkAnswer( + sql( + """SELECT l.a, r.b, l.p1, r.p2 + |FROM t l JOIN t r + |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 + """.stripMargin), + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + } + } + + test("save()/load() - non-partitioned table - Overwrite") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + testDF.collect()) + } + } + + test("save()/load() - non-partitioned table - Append") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)).orderBy("a"), + testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("save()/load() - non-partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("save()/load() - non-partitioned table - Ignore") { + withTempDir { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + assert(fs.listStatus(path).isEmpty) + } + } + + test("save()/load() - partitioned table - simple queries") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkQueries( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json))) + } + } + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } + + test("save()/load() - partitioned table - Overwrite") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - Append") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("save()/load() - partitioned table - Append - new partition values") { + withTempPath { file => + partitionedTestDF1.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("save()/load() - partitioned table - Ignore") { + withTempDir { file => + partitionedTestDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + assert(fs.listStatus(path).isEmpty) + } + } + + def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sql(s"DROP TABLE $tableName") + } + + test("saveAsTable()/load() - non-partitioned table - Overwrite") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkAnswer(table("t"), testDF.collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - Append") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append) + + withTable("t") { + checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("saveAsTable()/load() - non-partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + assert(table("t").collect().isEmpty) + } + } + + test("saveAsTable()/load() - partitioned table - simple queries") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkQueries(table("t")) + } + } + + test("saveAsTable()/load() - partitioned table - Overwrite") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - new partition values") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + // Using only a subset of all partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1")) + } + + // Using different order of partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p2", "p1")) + } + } + + test("saveAsTable()/load() - partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("saveAsTable()/load() - partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + assert(table("t").collect().isEmpty) + } + } + + test("Hadoop style globbing") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + val df = load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", + "dataSchema" -> dataSchema.json)) + + val expectedPaths = Set( + s"${file.getCanonicalFile}/p1=1/p2=foo", + s"${file.getCanonicalFile}/p1=2/p2=foo", + s"${file.getCanonicalFile}/p1=1/p2=bar", + s"${file.getCanonicalFile}/p1=2/p2=bar" + ).map { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + } + + println(df.queryExecution) + + val actualPaths = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: FSBasedRelation) => + relation.paths.toSet + }.getOrElse { + fail("Expect an FSBasedRelation, but none could be found") + } + + assert(actualPaths === expectedPaths) + checkAnswer(df, partitionedTestDF.collect()) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala new file mode 100644 index 0000000000..8801aba2f6 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.text.NumberFormat +import java.util.UUID + +import com.google.common.base.Objects +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} +import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} + +/** + * A simple example [[FSBasedRelationProvider]]. + */ +class SimpleTextSource extends FSBasedRelationProvider { + override def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): FSBasedRelation = { + val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField])) + new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext) + } +} + +class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] { + val numberFormat = NumberFormat.getInstance() + + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val split = context.getTaskAttemptID.getTaskID.getId + val name = FileOutputFormat.getOutputName(context) + new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}") + } +} + +class SimpleTextOutputWriter extends OutputWriter { + private var recordWriter: RecordWriter[NullWritable, Text] = _ + private var taskAttemptContext: TaskAttemptContext = _ + + override def init( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): Unit = { + recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) + taskAttemptContext = context + } + + override def write(row: Row): Unit = { + val serialized = row.toSeq.map(_.toString).mkString(",") + recordWriter.write(null, new Text(serialized)) + } + + override def close(): Unit = recordWriter.close(taskAttemptContext) +} + +/** + * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma + * separated string lines. When scanning data, schema must be explicitly provided via data source + * option `"dataSchema"`. + */ +class SimpleTextRelation( + paths: Array[String], + val maybeDataSchema: Option[StructType], + partitionsSchema: StructType, + parameters: Map[String, String])( + @transient val sqlContext: SQLContext) + extends FSBasedRelation(paths, partitionsSchema) { + + import sqlContext.sparkContext + + override val dataSchema: StructType = + maybeDataSchema.getOrElse(DataType.fromJson(parameters("dataSchema")).asInstanceOf[StructType]) + + override def equals(other: Any): Boolean = other match { + case that: SimpleTextRelation => + this.paths.sameElements(that.paths) && + this.maybeDataSchema == that.maybeDataSchema && + this.dataSchema == that.dataSchema && + this.partitionColumns == that.partitionColumns + + case _ => false + } + + override def hashCode(): Int = + Objects.hashCode(paths, maybeDataSchema, dataSchema) + + override def outputWriterClass: Class[_ <: OutputWriter] = + classOf[SimpleTextOutputWriter] + + override def buildScan(inputPaths: Array[String]): RDD[Row] = { + val fields = dataSchema.map(_.dataType) + + sparkContext.textFile(inputPaths.mkString(",")).map { record => + Row(record.split(",").zip(fields).map { case (value, dataType) => + Cast(Literal(value), dataType).eval() + }: _*) + } + } +} |