aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-13 11:04:10 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-13 11:04:10 -0700
commit7ff16e8abef9fbf4a4855e23c256b22e62e560a6 (patch)
tree1be1249ecb9db02ef5bf8820f7c44a7fbe71a6ff /sql/hive/src
parentbec938f777a2e18757c7d04504d86a5342e2b49e (diff)
downloadspark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.tar.gz
spark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.tar.bz2
spark-7ff16e8abef9fbf4a4855e23c256b22e62e560a6.zip
[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala35
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala)173
5 files changed, 149 insertions, 115 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d754c8e3a8..b0e82c8d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
import org.apache.spark.util.Utils
/* Implicit conversions */
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+ FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+ FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
- parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+ parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
+ PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+ }
if (useCached) {
Some(logical)
@@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
- s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
+ s"as Parquet. However, we are getting a $other from the metastore cache. " +
s"This cached entry will be invalidated.")
cachedDataSourceTables.invalidate(tableIdentifier)
None
@@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
- val created =
- LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+ val created = LogicalRelation(
+ new FSBasedParquetRelation(
+ paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
- val created =
- LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+ val created = LogicalRelation(
+ new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 47c60f651d..da5d203d9d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,21 +21,18 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.BeforeAndAfterEach
-
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql._
-import org.apache.spark.util.Utils
-import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* Tests for persisting tables created though the data sources API into the metastore.
@@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
+ case LogicalRelation(p: FSBasedParquetRelation) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[ParquetRelation2].getCanonicalName}")
+ s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
// Clenup and reset confs.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a5744ccc68..1d6393a3fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.DefaultParserDialect
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
-import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf}
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
@@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation2) =>
+ case LogicalRelation(r: FSBasedParquetRelation) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${ParquetRelation2.getClass.getCanonicalName}.")
+ s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
+ s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
@@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest {
sql(s"DROP TABLE $tableName")
}
}
-
+
test("SPARK-5203 union with different decimal precision") {
Seq.empty[(Decimal, Decimal)]
.toDF("d1", "d2")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index bf1121ddf0..41bcbe84b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,16 +21,15 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.{QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
-import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
@@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
- case _ =>
- fail(
- s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+ case LogicalRelation(_: FSBasedParquetRelation) => // OK
+ case _ => fail(
+ "test_parquet_ctas should be converted to " +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(
- InsertIntoDataSource(
- LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(
- InsertIntoDataSource(
- LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+ case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: ParquetRelation2) => r
+ case r @ LogicalRelation(_: FSBasedParquetRelation) => r
}.size
}
@@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+ case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
@@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
- intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+ intercept[Throwable](df2.saveAsParquetFile(filePath))
val df3 = df2.toDF("str", "max_int")
df3.saveAsParquetFile(filePath2)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
index e8b48a0db1..394833f229 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
@@ -28,12 +28,14 @@ import org.apache.spark.sql.types._
// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
-class FSBasedRelationSuite extends QueryTest with ParquetTest {
+class FSBasedRelationTest extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestHive
import sqlContext._
import sqlContext.implicits._
+ val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+
val dataSchema =
StructType(
Seq(
@@ -92,17 +94,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -114,17 +116,17 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempPath { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append)
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)).orderBy("a"),
@@ -137,7 +139,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[RuntimeException] {
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@@ -147,7 +149,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
testDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@@ -159,62 +161,37 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - simple queries") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkQueries(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)))
}
}
- test("save()/load() - partitioned table - simple queries - partition columns in data") {
- withTempDir { file =>
- val basePath = new Path(file.getCanonicalPath)
- val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
- val qualifiedBasePath = fs.makeQualified(basePath)
-
- for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
- val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
- sparkContext
- .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
- .saveAsTextFile(partitionDir.toString)
- }
-
- val dataSchemaWithPartition =
- StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
-
- checkQueries(
- load(
- source = classOf[SimpleTextSource].getCanonicalName,
- options = Map(
- "path" -> file.getCanonicalPath,
- "dataSchema" -> dataSchemaWithPartition.json)))
- }
- }
-
test("save()/load() - partitioned table - Overwrite") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -225,20 +202,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -249,20 +226,20 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Append - new partition values") {
withTempPath { file =>
partitionedTestDF1.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
checkAnswer(
load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchema.json)),
@@ -274,7 +251,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
intercept[RuntimeException] {
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
@@ -286,7 +263,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempDir { file =>
partitionedTestDF.save(
path = file.getCanonicalPath,
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
val path = new Path(file.getCanonicalPath)
@@ -302,7 +279,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@@ -314,12 +291,12 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - non-partitioned table - Append") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append)
withTable("t") {
@@ -334,7 +311,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists)
}
}
@@ -346,7 +323,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
testDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore)
assert(table("t").collect().isEmpty)
@@ -356,7 +333,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - simple queries") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
@@ -368,14 +345,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Overwrite") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -388,14 +365,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -408,14 +385,14 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - new partition values") {
partitionedTestDF1.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -428,7 +405,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
partitionedTestDF1.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -437,7 +414,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1"))
@@ -447,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[Throwable] {
partitionedTestDF2.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Append,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p2", "p1"))
@@ -461,7 +438,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
intercept[AnalysisException] {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.ErrorIfExists,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -475,7 +452,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
withTempTable("t") {
partitionedTestDF.saveAsTable(
tableName = "t",
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Ignore,
options = Map("dataSchema" -> dataSchema.json),
partitionColumns = Seq("p1", "p2"))
@@ -487,13 +464,13 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
test("Hadoop style globbing") {
withTempPath { file =>
partitionedTestDF.save(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
mode = SaveMode.Overwrite,
options = Map("path" -> file.getCanonicalPath),
partitionColumns = Seq("p1", "p2"))
val df = load(
- source = classOf[SimpleTextSource].getCanonicalName,
+ source = dataSourceName,
options = Map(
"path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
"dataSchema" -> dataSchema.json))
@@ -521,3 +498,67 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
}
}
}
+
+class SimpleTextRelationSuite extends FSBasedRelationTest {
+ override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
+
+ import sqlContext._
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+ .saveAsTextFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ load(
+ source = dataSourceName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchemaWithPartition.json)))
+ }
+ }
+}
+
+class FSBasedParquetRelationSuite extends FSBasedRelationTest {
+ override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
+
+ import sqlContext._
+ import sqlContext.implicits._
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+ .toDF("a", "b", "p1")
+ .saveAsParquetFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ load(
+ source = dataSourceName,
+ options = Map(
+ "path" -> file.getCanonicalPath,
+ "dataSchema" -> dataSchemaWithPartition.json)))
+ }
+ }
+}