diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-13 11:04:10 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-05-13 11:04:21 -0700 |
commit | 90f304b0c90b82f59bcb776feb6b2d845f7193ff (patch) | |
tree | 0dcdef57cef3cb0aa84363396a262c8a84b2661b /sql/hive | |
parent | 10007fbe0ba4f1d86645cc03ead3fb86dc96253b (diff) | |
download | spark-90f304b0c90b82f59bcb776feb6b2d845f7193ff.tar.gz spark-90f304b0c90b82f59bcb776feb6b2d845f7193ff.tar.bz2 spark-90f304b0c90b82f59bcb776feb6b2d845f7193ff.zip |
[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are:
1. Partition discovery code has been factored out to `FSBasedRelation`
1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions
1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition
1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore
After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #6090 from liancheng/parquet-migration and squashes the following commits:
6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter
bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing
f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist
261d8c1 [Cheng Lian] Minor bug fix and more tests
db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
(cherry picked from commit 7ff16e8abef9fbf4a4855e23c256b22e62e560a6)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 25 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 15 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 16 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 35 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala) | 173 |
5 files changed, 149 insertions, 115 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d754c8e3a8..b0e82c8d03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. val parquetOptions = Map( - ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, - ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) + FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, + FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: ParquetRelation2) => + case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = parquetRelation.paths.toSet == pathsInMetastore.toSet && logical.schema.sameType(metastoreSchema) && - parquetRelation.maybePartitionSpec == partitionSpecInMetastore + parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[sources.Partition]) + } if (useCached) { Some(logical) @@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case other => logWarning( s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a ${other} from the metastore cache. " + + s"as Parquet. However, we are getting a $other from the metastore cache. " + s"This cached entry will be invalidated.") cachedDataSourceTables.invalidate(tableIdentifier) None @@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) val parquetRelation = cached.getOrElse { - val created = - LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive)) + val created = LogicalRelation( + new FSBasedParquetRelation( + paths.toArray, None, Some(partitionSpec), parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } @@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { - val created = - LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive)) + val created = LogicalRelation( + new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 47c60f651d..da5d203d9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -21,21 +21,18 @@ import java.io.File import scala.collection.mutable.ArrayBuffer -import org.scalatest.BeforeAndAfterEach - import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.mapred.InvalidInputException +import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql._ -import org.apache.spark.util.Utils -import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.LogicalRelation +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. @@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation2) => // OK + case LogicalRelation(p: FSBasedParquetRelation) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[ParquetRelation2].getCanonicalName}") + s"${classOf[FSBasedParquetRelation].getCanonicalName}") } // Clenup and reset confs. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a5744ccc68..1d6393a3fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException -import org.apache.spark.sql.DefaultParserDialect -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} -import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim} -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation} +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { - case LogicalRelation(r: ParquetRelation2) => + case LogicalRelation(r: FSBasedParquetRelation) => if (!isDataSourceParquet) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + - s"${ParquetRelation2.getClass.getCanonicalName}.") + s"${FSBasedParquetRelation.getClass.getCanonicalName}.") } case r: MetastoreRelation => if (isDataSourceParquet) { fail( - s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + + s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") } } @@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest { sql(s"DROP TABLE $tableName") } } - + test("SPARK-5203 union with different decimal precision") { Seq.empty[(Decimal, Decimal)] .toDF("d1", "d2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index bf1121ddf0..41bcbe84b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,16 +21,15 @@ import java.io.File import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{QueryTest, SQLConf} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation} -import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan} +import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. @@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation2) => // OK - case _ => - fail( - s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + case LogicalRelation(_: FSBasedParquetRelation) => // OK + case _ => fail( + "test_parquet_ctas should be converted to " + + s"${classOf[FSBasedParquetRelation].getCanonicalName}") } sql("DROP TABLE IF EXISTS test_parquet_ctas") @@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand( - InsertIntoDataSource( - LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + - s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"However, found a ${o.toString} ") } @@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand( - InsertIntoDataSource( - LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"However, found a ${o.toString} ") } @@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: ParquetRelation2) => r + case r @ LogicalRelation(_: FSBasedParquetRelation) => r }.size } @@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK + case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + @@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") - intercept[RuntimeException](df2.saveAsParquetFile(filePath)) + intercept[Throwable](df2.saveAsParquetFile(filePath)) val df3 = df2.toDF("str", "max_int") df3.saveAsParquetFile(filePath2) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala index e8b48a0db1..394833f229 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala @@ -28,12 +28,14 @@ import org.apache.spark.sql.types._ // TODO Don't extend ParquetTest // This test suite extends ParquetTest for some convenient utility methods. These methods should be // moved to some more general places, maybe QueryTest. -class FSBasedRelationSuite extends QueryTest with ParquetTest { +class FSBasedRelationTest extends QueryTest with ParquetTest { override val sqlContext: SQLContext = TestHive import sqlContext._ import sqlContext.implicits._ + val dataSourceName = classOf[SimpleTextSource].getCanonicalName + val dataSchema = StructType( Seq( @@ -92,17 +94,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempPath { file => testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite) testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite) checkAnswer( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json)), @@ -114,17 +116,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempPath { file => testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite) testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append) checkAnswer( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json)).orderBy("a"), @@ -137,7 +139,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { intercept[RuntimeException] { testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.ErrorIfExists) } } @@ -147,7 +149,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempDir { file => testDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Ignore) val path = new Path(file.getCanonicalPath) @@ -159,62 +161,37 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("save()/load() - partitioned table - simple queries") { withTempPath { file => partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.ErrorIfExists, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) checkQueries( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json))) } } - test("save()/load() - partitioned table - simple queries - partition columns in data") { - withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") - sparkContext - .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") - .saveAsTextFile(partitionDir.toString) - } - - val dataSchemaWithPartition = - StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) - - checkQueries( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchemaWithPartition.json))) - } - } - test("save()/load() - partitioned table - Overwrite") { withTempPath { file => partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) checkAnswer( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json)), @@ -225,20 +202,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("save()/load() - partitioned table - Append") { withTempPath { file => partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) checkAnswer( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json)), @@ -249,20 +226,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("save()/load() - partitioned table - Append - new partition values") { withTempPath { file => partitionedTestDF1.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) partitionedTestDF2.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) checkAnswer( load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchema.json)), @@ -274,7 +251,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempDir { file => intercept[RuntimeException] { partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.ErrorIfExists, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) @@ -286,7 +263,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempDir { file => partitionedTestDF.save( path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Ignore) val path = new Path(file.getCanonicalPath) @@ -302,7 +279,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - non-partitioned table - Overwrite") { testDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, Map("dataSchema" -> dataSchema.json)) @@ -314,12 +291,12 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - non-partitioned table - Append") { testDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite) testDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append) withTable("t") { @@ -334,7 +311,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { intercept[AnalysisException] { testDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.ErrorIfExists) } } @@ -346,7 +323,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempTable("t") { testDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Ignore) assert(table("t").collect().isEmpty) @@ -356,7 +333,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - partitioned table - simple queries") { partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, Map("dataSchema" -> dataSchema.json)) @@ -368,14 +345,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - partitioned table - Overwrite") { partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -388,14 +365,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - partitioned table - Append") { partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -408,14 +385,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - partitioned table - Append - new partition values") { partitionedTestDF1.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) partitionedTestDF2.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -428,7 +405,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { partitionedTestDF1.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -437,7 +414,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { intercept[Throwable] { partitionedTestDF2.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1")) @@ -447,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { intercept[Throwable] { partitionedTestDF2.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Append, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p2", "p1")) @@ -461,7 +438,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { intercept[AnalysisException] { partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.ErrorIfExists, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -475,7 +452,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { withTempTable("t") { partitionedTestDF.saveAsTable( tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Ignore, options = Map("dataSchema" -> dataSchema.json), partitionColumns = Seq("p1", "p2")) @@ -487,13 +464,13 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { test("Hadoop style globbing") { withTempPath { file => partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, mode = SaveMode.Overwrite, options = Map("path" -> file.getCanonicalPath), partitionColumns = Seq("p1", "p2")) val df = load( - source = classOf[SimpleTextSource].getCanonicalName, + source = dataSourceName, options = Map( "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", "dataSchema" -> dataSchema.json)) @@ -521,3 +498,67 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest { } } } + +class SimpleTextRelationSuite extends FSBasedRelationTest { + override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName + + import sqlContext._ + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } +} + +class FSBasedParquetRelationSuite extends FSBasedRelationTest { + override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName + + import sqlContext._ + import sqlContext.implicits._ + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) + .toDF("a", "b", "p1") + .saveAsParquetFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } +} |