aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-13 01:32:28 +0800
committerCheng Lian <lian@databricks.com>2015-05-13 01:32:28 +0800
commit0595b6de8f1da04baceda082553c2aa1aa2cb006 (patch)
treeabfc400a323f64d324a5ea245294032433313001 /sql/hive
parent831504cf6bde6b1131005d5552e56a842725c84c (diff)
downloadspark-0595b6de8f1da04baceda082553c2aa1aa2cb006.tar.gz
spark-0595b6de8f1da04baceda082553c2aa1aa2cb006.tar.bz2
spark-0595b6de8f1da04baceda082553c2aa1aa2cb006.zip
[SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API
This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work. ## New features provided 1. Hive compatible partition discovery This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0. 1. Generalized partition pruning optimization Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore. (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.) 1. Insertion with dynamic partitions When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns. ## New structures provided ### Developer API 1. `FSBasedRelation` Base abstract class for file system based data sources. 1. `OutputWriter` Base abstract class for output row writers, responsible for writing a single row object. 1. `FSBasedRelationProvider` A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`. ### User API New overloaded versions of 1. `DataFrame.save()` 1. `DataFrame.saveAsTable()` 1. `SQLContext.load()` are provided to allow users to save/load DataFrames with user defined dynamic partition columns. ### Spark SQL query planning 1. `InsertIntoFSBasedRelation` Used to implement write path for `FSBasedRelation`s. 1. New rules for `FSBasedRelation` in `DataSourceStrategy` These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning. ## TODO - [ ] Use scratch directories when overwriting a table with data selected from itself. Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it. - [ ] When inserting with dynamic partition columns, use external sorter to group the data first. This ensures that we only need to open a single `OutputWriter` at a time. For data sources like Parquet, `OutputWriter`s can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame. - [x] More tests. Specifically, test cases for - [x] Self-join - [x] Loading partitioned relations with a subset of partition columns stored in data files. - [x] `SQLContext.load()` with user defined dynamic partition columns. ## Parquet data source migration Parquet data source migration is covered in PR https://github.com/liancheng/spark/pull/6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged. Author: Cheng Lian <lian@databricks.com> Closes #5526 from liancheng/partitioning-support and squashes the following commits: 5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing 1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations 43ba50e [Cheng Lian] Avoids serializing generated projection code edf49e7 [Cheng Lian] Removed commented stale code block 348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths) ad4d4de [Cheng Lian] Enables HDFS style globbing 8d12e69 [Cheng Lian] Fixes compilation error c71ac6c [Cheng Lian] Addresses comments from @marmbrus 7552168 [Cheng Lian] Fixes typo in MimaExclude.scala 0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing 52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala c466de6 [Cheng Lian] Addresses comments bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows 795920a [Cheng Lian] Fixes compilation error after rebasing 0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables fa543f3 [Cheng Lian] Addresses comments 5849dd0 [Cheng Lian] Fixes doc typos. Fixes partition discovery refresh. 51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite c4ed4fe [Cheng Lian] Bug fixes and a new test suite a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan 5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat 54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init 0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers 422ff4a [Cheng Lian] Fixes style issue ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns 8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet f18dec2 [Cheng Lian] More strict schema checking b746ab5 [Cheng Lian] More tests 9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing ea6c8dd [Cheng Lian] Removes remote debugging stuff 327bb1d [Cheng Lian] Implements partitioning support for data sources API 3c5073a [Cheng Lian] Fixes SaveModes used in test cases fb5a607 [Cheng Lian] Fixes compilation error 9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor 5de194a [Cheng Lian] Forgot Apache licence header 95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider 770b5ba [Cheng Lian] Adds tests for FSBasedRelation 3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning 1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation aa8ba9a [Cheng Lian] Javadoc fix 012ed2d [Cheng Lian] Adds PartitioningOptions 7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala27
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala525
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala125
6 files changed, 675 insertions, 23 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbf48efb24..d754c8e3a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,25 +19,24 @@ package org.apache.spark.sql.hive
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.spark.Logging
-import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
import org.apache.spark.util.Utils
/* Implicit conversions */
@@ -98,6 +97,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
ResolvedDataSource(
hive,
userSpecifiedSchema,
+ Array.empty[String],
table.properties("spark.sql.sources.provider"),
options)
@@ -438,6 +438,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
desc.name,
hive.conf.defaultDataSourceName,
temporary = false,
+ Array.empty[String],
mode,
options = Map.empty[String, String],
child
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index be9249a8b1..d46a127d47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -221,14 +221,14 @@ private[hive] trait HiveStrategies {
object HiveDDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(
- tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
+ tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
ExecutedCommand(
CreateMetastoreDataSource(
tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
- case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
+ case CreateTableUsingAsSelect(tableName, provider, false, partitionCols, mode, opts, query) =>
val cmd =
- CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
+ CreateMetastoreDataSourceAsSelect(tableName, provider, partitionCols, mode, opts, query)
ExecutedCommand(cmd) :: Nil
case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index abab1a223a..8e405e0804 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -158,6 +158,7 @@ private[hive]
case class CreateMetastoreDataSourceAsSelect(
tableName: String,
provider: String,
+ partitionColumns: Array[String],
mode: SaveMode,
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
@@ -189,12 +190,12 @@ case class CreateMetastoreDataSourceAsSelect(
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
- val resolved =
- ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath)
+ val resolved = ResolvedDataSource(
+ sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
- case l @ LogicalRelation(i: InsertableRelation) =>
- if (i != createdRelation.relation) {
+ case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+ if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
s"match the existing relation of $tableName. " +
@@ -202,14 +203,13 @@ case class CreateMetastoreDataSourceAsSelect(
s"table $tableName and using its data source and options."
val errorMessage =
s"""
- |$errorDescription
- |== Relations ==
- |${sideBySide(
- s"== Expected Relation ==" ::
- l.toString :: Nil,
- s"== Actual Relation ==" ::
- createdRelation.toString :: Nil).mkString("\n")}
- """.stripMargin
+ |$errorDescription
+ |== Relations ==
+ |${sideBySide(
+ s"== Expected Relation ==" :: l.toString :: Nil,
+ s"== Actual Relation ==" :: createdRelation.toString :: Nil
+ ).mkString("\n")}
+ """.stripMargin
throw new AnalysisException(errorMessage)
}
existingSchema = Some(l.schema)
@@ -234,7 +234,8 @@ case class CreateMetastoreDataSourceAsSelect(
}
// Create the relation based on the data of df.
- val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
+ val resolved =
+ ResolvedDataSource(sqlContext, provider, partitionColumns, mode, optionsWithPath, df)
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8398da2681..cbc381cc81 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -204,7 +204,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
if (string == null || string.isEmpty) {
defaultPartName
} else {
- FileUtils.escapePathName(string)
+ FileUtils.escapePathName(string, defaultPartName)
}
s"/$col=$colString"
}.mkString
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
new file mode 100644
index 0000000000..415b1cd168
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.types._
+
+// TODO Don't extend ParquetTest
+// This test suite extends ParquetTest for some convenient utility methods. These methods should be
+// moved to some more general places, maybe QueryTest.
+class FSBasedRelationSuite extends QueryTest with ParquetTest {
+ override val sqlContext: SQLContext = TestHive
+
+ import sqlContext._
+ import sqlContext.implicits._
+
+ val dataSchema =
+ StructType(
+ Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", StringType, nullable = false)))
+
+ val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
+
+ val partitionedTestDF1 = (for {
+ i <- 1 to 3
+ p2 <- Seq("foo", "bar")
+ } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
+
+ val partitionedTestDF2 = (for {
+ i <- 1 to 3
+ p2 <- Seq("foo", "bar")
+ } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
+
+ val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
+
+ def checkQueries(df: DataFrame): Unit = {
+ // Selects everything
+ checkAnswer(
+ df,
+ for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+
+ // Simple filtering and partition pruning
+ checkAnswer(
+ df.filter('a > 1 && 'p1 === 2),
+ for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2))
+
+ // Simple projection and filtering
+ checkAnswer(
+ df.filter('a > 1).select('b, 'a + 1),
+ for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1))
+
+ // Simple projection and partition pruning
+ checkAnswer(
+ df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
+ for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
+
+ // Self-join
+ df.registerTempTable("t")
+ withTempTable("t") {
+ checkAnswer(
+ sql(
+ """SELECT l.a, r.b, l.p1, r.p2
+ |FROM t l JOIN t r
+ |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
+ """.stripMargin),
+ for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2))
+ }
+ }
+
+ test("save()/load() - non-partitioned table - Overwrite") {
+ withTempPath { file =>
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite)
+
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite)
+
+ checkAnswer(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)),
+ testDF.collect())
+ }
+ }
+
+ test("save()/load() - non-partitioned table - Append") {
+ withTempPath { file =>
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite)
+
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append)
+
+ checkAnswer(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)).orderBy("a"),
+ testDF.unionAll(testDF).orderBy("a").collect())
+ }
+ }
+
+ test("save()/load() - non-partitioned table - ErrorIfExists") {
+ withTempDir { file =>
+ intercept[RuntimeException] {
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.ErrorIfExists)
+ }
+ }
+ }
+
+ test("save()/load() - non-partitioned table - Ignore") {
+ withTempDir { file =>
+ testDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Ignore)
+
+ val path = new Path(file.getCanonicalPath)
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ assert(fs.listStatus(path).isEmpty)
+ }
+ }
+
+ test("save()/load() - partitioned table - simple queries") {
+ withTempPath { file =>
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.ErrorIfExists,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ checkQueries(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)))
+ }
+ }
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+ .saveAsTextFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchemaWithPartition.json)))
+ }
+ }
+
+ test("save()/load() - partitioned table - Overwrite") {
+ withTempPath { file =>
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ checkAnswer(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)),
+ partitionedTestDF.collect())
+ }
+ }
+
+ test("save()/load() - partitioned table - Append") {
+ withTempPath { file =>
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ checkAnswer(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)),
+ partitionedTestDF.unionAll(partitionedTestDF).collect())
+ }
+ }
+
+ test("save()/load() - partitioned table - Append - new partition values") {
+ withTempPath { file =>
+ partitionedTestDF1.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF2.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ checkAnswer(
+ load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchema.json)),
+ partitionedTestDF.collect())
+ }
+ }
+
+ test("save()/load() - partitioned table - ErrorIfExists") {
+ withTempDir { file =>
+ intercept[RuntimeException] {
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.ErrorIfExists,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+ }
+ }
+ }
+
+ test("save()/load() - partitioned table - Ignore") {
+ withTempDir { file =>
+ partitionedTestDF.save(
+ path = file.getCanonicalPath,
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Ignore)
+
+ val path = new Path(file.getCanonicalPath)
+ val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
+ assert(fs.listStatus(path).isEmpty)
+ }
+ }
+
+ def withTable(tableName: String)(f: => Unit): Unit = {
+ try f finally sql(s"DROP TABLE $tableName")
+ }
+
+ test("saveAsTable()/load() - non-partitioned table - Overwrite") {
+ testDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ Map("dataSchema" -> dataSchema.json))
+
+ withTable("t") {
+ checkAnswer(table("t"), testDF.collect())
+ }
+ }
+
+ test("saveAsTable()/load() - non-partitioned table - Append") {
+ testDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite)
+
+ testDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append)
+
+ withTable("t") {
+ checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
+ }
+ }
+
+ test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
+ Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+ withTempTable("t") {
+ intercept[AnalysisException] {
+ testDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.ErrorIfExists)
+ }
+ }
+ }
+
+ test("saveAsTable()/load() - non-partitioned table - Ignore") {
+ Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+ withTempTable("t") {
+ testDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Ignore)
+
+ assert(table("t").collect().isEmpty)
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - simple queries") {
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ Map("dataSchema" -> dataSchema.json))
+
+ withTable("t") {
+ checkQueries(table("t"))
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - Overwrite") {
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ withTable("t") {
+ checkAnswer(table("t"), partitionedTestDF.collect())
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - Append") {
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ withTable("t") {
+ checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect())
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - Append - new partition values") {
+ partitionedTestDF1.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ partitionedTestDF2.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ withTable("t") {
+ checkAnswer(table("t"), partitionedTestDF.collect())
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
+ partitionedTestDF1.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ // Using only a subset of all partition columns
+ intercept[Throwable] {
+ partitionedTestDF2.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1"))
+ }
+
+ // Using different order of partition columns
+ intercept[Throwable] {
+ partitionedTestDF2.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Append,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p2", "p1"))
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
+ Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+ withTempTable("t") {
+ intercept[AnalysisException] {
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.ErrorIfExists,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+ }
+ }
+ }
+
+ test("saveAsTable()/load() - partitioned table - Ignore") {
+ Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+ withTempTable("t") {
+ partitionedTestDF.saveAsTable(
+ tableName = "t",
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Ignore,
+ options = Map("dataSchema" -> dataSchema.json),
+ partitionColumns = Seq("p1", "p2"))
+
+ assert(table("t").collect().isEmpty)
+ }
+ }
+
+ test("Hadoop style globbing") {
+ withTempPath { file =>
+ partitionedTestDF.save(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ mode = SaveMode.Overwrite,
+ options = Map("path" -> file.getCanonicalPath),
+ partitionColumns = Seq("p1", "p2"))
+
+ val df = load(
+ source = classOf[SimpleTextSource].getCanonicalName,
+ options = Map(
+ "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
+ "dataSchema" -> dataSchema.json))
+
+ val expectedPaths = Set(
+ s"${file.getCanonicalFile}/p1=1/p2=foo",
+ s"${file.getCanonicalFile}/p1=2/p2=foo",
+ s"${file.getCanonicalFile}/p1=1/p2=bar",
+ s"${file.getCanonicalFile}/p1=2/p2=bar"
+ ).map { p =>
+ val path = new Path(p)
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+ }
+
+ println(df.queryExecution)
+
+ val actualPaths = df.queryExecution.analyzed.collectFirst {
+ case LogicalRelation(relation: FSBasedRelation) =>
+ relation.paths.toSet
+ }.getOrElse {
+ fail("Expect an FSBasedRelation, but none could be found")
+ }
+
+ assert(actualPaths === expectedPaths)
+ checkAnswer(df, partitionedTestDF.collect())
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
new file mode 100644
index 0000000000..8801aba2f6
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.NumberFormat
+import java.util.UUID
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLContext}
+
+/**
+ * A simple example [[FSBasedRelationProvider]].
+ */
+class SimpleTextSource extends FSBasedRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ paths: Array[String],
+ schema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): FSBasedRelation = {
+ val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
+ new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
+ }
+}
+
+class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
+ val numberFormat = NumberFormat.getInstance()
+
+ numberFormat.setMinimumIntegerDigits(5)
+ numberFormat.setGroupingUsed(false)
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val split = context.getTaskAttemptID.getTaskID.getId
+ val name = FileOutputFormat.getOutputName(context)
+ new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+ }
+}
+
+class SimpleTextOutputWriter extends OutputWriter {
+ private var recordWriter: RecordWriter[NullWritable, Text] = _
+ private var taskAttemptContext: TaskAttemptContext = _
+
+ override def init(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): Unit = {
+ recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+ taskAttemptContext = context
+ }
+
+ override def write(row: Row): Unit = {
+ val serialized = row.toSeq.map(_.toString).mkString(",")
+ recordWriter.write(null, new Text(serialized))
+ }
+
+ override def close(): Unit = recordWriter.close(taskAttemptContext)
+}
+
+/**
+ * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma
+ * separated string lines. When scanning data, schema must be explicitly provided via data source
+ * option `"dataSchema"`.
+ */
+class SimpleTextRelation(
+ paths: Array[String],
+ val maybeDataSchema: Option[StructType],
+ partitionsSchema: StructType,
+ parameters: Map[String, String])(
+ @transient val sqlContext: SQLContext)
+ extends FSBasedRelation(paths, partitionsSchema) {
+
+ import sqlContext.sparkContext
+
+ override val dataSchema: StructType =
+ maybeDataSchema.getOrElse(DataType.fromJson(parameters("dataSchema")).asInstanceOf[StructType])
+
+ override def equals(other: Any): Boolean = other match {
+ case that: SimpleTextRelation =>
+ this.paths.sameElements(that.paths) &&
+ this.maybeDataSchema == that.maybeDataSchema &&
+ this.dataSchema == that.dataSchema &&
+ this.partitionColumns == that.partitionColumns
+
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ Objects.hashCode(paths, maybeDataSchema, dataSchema)
+
+ override def outputWriterClass: Class[_ <: OutputWriter] =
+ classOf[SimpleTextOutputWriter]
+
+ override def buildScan(inputPaths: Array[String]): RDD[Row] = {
+ val fields = dataSchema.map(_.dataType)
+
+ sparkContext.textFile(inputPaths.mkString(",")).map { record =>
+ Row(record.split(",").zip(fields).map { case (value, dataType) =>
+ Cast(Literal(value), dataType).eval()
+ }: _*)
+ }
+ }
+}