aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-26 16:49:19 -0700
committerReynold Xin <rxin@databricks.com>2015-07-26 16:49:19 -0700
commitc025c3d0a1fdfbc45b64db9c871176b40b4a7b9b (patch)
tree71d106872ef4a948f187433ad4cd291427949745 /sql/hive
parent6b2baec04fa3d928f0ee84af8c2723ac03a4648c (diff)
downloadspark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.tar.gz
spark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.tar.bz2
spark-c025c3d0a1fdfbc45b64db9c871176b40b4a7b9b.zip
[SPARK-9095] [SQL] Removes the old Parquet support
This PR removes the old Parquet support: - Removes the old `ParquetRelation` together with related SQL configuration, plan nodes, strategies, utility classes, and test suites. - Renames `ParquetRelation2` to `ParquetRelation` - Renames `RowReadSupport` and `RowRecordMaterializer` to `CatalystReadSupport` and `CatalystRecordMaterializer` respectively, and moved them to separate files. This follows naming convention used in other Parquet data models implemented in parquet-mr. It should be easier for developers who are familiar with Parquet to follow. There's still some other code that can be cleaned up. Especially `RowWriteSupport`. But I'd like to leave this part to SPARK-8848. Author: Cheng Lian <lian@databricks.com> Closes #7441 from liancheng/spark-9095 and squashes the following commits: c7b6e38 [Cheng Lian] Removes WriteToFile 2d688d6 [Cheng Lian] Renames ParquetRelation2 to ParquetRelation ca9e1b7 [Cheng Lian] Removes old Parquet support
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala141
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala86
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala54
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala174
7 files changed, 106 insertions, 387 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4cdb83c511..1b8edefef4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -444,9 +444,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
HiveDDLStrategy,
DDLStrategy,
TakeOrderedAndProject,
- ParquetOperations,
InMemoryScans,
- ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0a2121c955..2629235312 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.Warehouse
@@ -30,7 +29,6 @@ import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -39,10 +37,11 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
@@ -260,8 +259,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -272,7 +271,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -317,7 +316,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new ParquetRelation2(
+ new ParquetRelation(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -330,7 +329,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
+ new ParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
@@ -370,8 +369,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
- *
- * This rule can be considered as [[HiveStrategies.ParquetConversion]] done right.
*/
object ParquetConversions extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -386,7 +383,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -397,7 +393,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -406,7 +401,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
- conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a22c3292ef..cd6cd322c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,23 +17,14 @@
package org.apache.spark.sql.hive
-import scala.collection.JavaConversions._
-
-import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.types.StringType
private[hive] trait HiveStrategies {
@@ -42,136 +33,6 @@ private[hive] trait HiveStrategies {
val hiveContext: HiveContext
- /**
- * :: Experimental ::
- * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
- * table scan operator.
- *
- * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
- * but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
- *
- * Other issues:
- * - Much of this logic assumes case insensitive resolution.
- */
- @Experimental
- object ParquetConversion extends Strategy {
- implicit class LogicalPlanHacks(s: DataFrame) {
- def lowerCase: DataFrame = DataFrame(s.sqlContext, s.logicalPlan)
-
- def addPartitioningAttributes(attrs: Seq[Attribute]): DataFrame = {
- // Don't add the partitioning key if its already present in the data.
- if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
- s
- } else {
- DataFrame(
- s.sqlContext,
- s.logicalPlan transform {
- case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
- })
- }
- }
- }
-
- implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
- def fakeOutput(newOutput: Seq[Attribute]): OutputFaker =
- OutputFaker(
- originalPlan.output.map(a =>
- newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
- .getOrElse(
- sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
- originalPlan)
- }
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
- if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
- hiveContext.convertMetastoreParquet &&
- !hiveContext.conf.parquetUseDataSourceApi =>
-
- // Filter out all predicates that only deal with partition keys
- val partitionsKeys = AttributeSet(relation.partitionKeys)
- val (pruningPredicates, otherPredicates) = predicates.partition {
- _.references.subsetOf(partitionsKeys)
- }
-
- // We are going to throw the predicates and projection back at the whole optimization
- // sequence so lets unresolve all the attributes, allowing them to be rebound to the
- // matching parquet attributes.
- val unresolvedOtherPredicates = Column(otherPredicates.map(_ transform {
- case a: AttributeReference => UnresolvedAttribute(a.name)
- }).reduceOption(And).getOrElse(Literal(true)))
-
- val unresolvedProjection: Seq[Column] = projectList.map(_ transform {
- case a: AttributeReference => UnresolvedAttribute(a.name)
- }).map(Column(_))
-
- try {
- if (relation.hiveQlTable.isPartitioned) {
- val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
- // Translate the predicate so that it automatically casts the input values to the
- // correct data types during evaluation.
- val castedPredicate = rawPredicate transform {
- case a: AttributeReference =>
- val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
- val key = relation.partitionKeys(idx)
- Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
- }
-
- val inputData = new GenericMutableRow(relation.partitionKeys.size)
- val pruningCondition =
- if (codegenEnabled) {
- GeneratePredicate.generate(castedPredicate)
- } else {
- InterpretedPredicate.create(castedPredicate)
- }
-
- val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
- val partitionValues = part.getValues
- var i = 0
- while (i < partitionValues.size()) {
- inputData(i) = CatalystTypeConverters.convertToCatalyst(partitionValues(i))
- i += 1
- }
- pruningCondition(inputData)
- }
-
- val partitionLocations = partitions.map(_.getLocation)
-
- if (partitionLocations.isEmpty) {
- PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
- } else {
- hiveContext
- .read.parquet(partitionLocations: _*)
- .addPartitioningAttributes(relation.partitionKeys)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection: _*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
- }
-
- } else {
- hiveContext
- .read.parquet(relation.hiveQlTable.getDataLocation.toString)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection: _*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
- }
- } catch {
- // parquetFile will throw an exception when there is no data.
- // TODO: Remove this hack for Spark 1.3.
- case iae: java.lang.IllegalArgumentException
- if iae.getMessage.contains("Can not create a Path from an empty string") =>
- PhysicalRDD(plan.output, sparkContext.emptyRDD[InternalRow]) :: Nil
- }
- case _ => Nil
- }
- }
-
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index af68615e8e..a45c2d9572 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql.{QueryTest, Row}
case class Cases(lower: String, UPPER: String)
@@ -28,64 +28,54 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
import sqlContext._
- def run(prefix: String): Unit = {
- test(s"$prefix: Case insensitive attribute names") {
- withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
- val expected = (1 to 4).map(i => Row(i.toString))
- checkAnswer(sql("SELECT upper FROM cases"), expected)
- checkAnswer(sql("SELECT LOWER FROM cases"), expected)
- }
+ test("Case insensitive attribute names") {
+ withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") {
+ val expected = (1 to 4).map(i => Row(i.toString))
+ checkAnswer(sql("SELECT upper FROM cases"), expected)
+ checkAnswer(sql("SELECT LOWER FROM cases"), expected)
}
+ }
- test(s"$prefix: SELECT on Parquet table") {
- val data = (1 to 4).map(i => (i, s"val_$i"))
- withParquetTable(data, "t") {
- checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
- }
+ test("SELECT on Parquet table") {
+ val data = (1 to 4).map(i => (i, s"val_$i"))
+ withParquetTable(data, "t") {
+ checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
}
+ }
- test(s"$prefix: Simple column projection + filter on Parquet table") {
- withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
- checkAnswer(
- sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
- Seq(Row(true, "val_2"), Row(true, "val_4")))
- }
+ test("Simple column projection + filter on Parquet table") {
+ withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+ checkAnswer(
+ sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+ Seq(Row(true, "val_2"), Row(true, "val_4")))
}
+ }
- test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
- withTempPath { dir =>
- sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
- read.parquet(dir.getCanonicalPath).registerTempTable("p")
- withTempTable("p") {
- checkAnswer(
- sql("SELECT * FROM src ORDER BY key"),
- sql("SELECT * from p ORDER BY key").collect().toSeq)
- }
+ test("Converting Hive to Parquet Table via saveAsParquetFile") {
+ withTempPath { dir =>
+ sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
+ read.parquet(dir.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ checkAnswer(
+ sql("SELECT * FROM src ORDER BY key"),
+ sql("SELECT * from p ORDER BY key").collect().toSeq)
}
}
+ }
- test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
- withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
- withTempPath { file =>
- sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
- read.parquet(file.getCanonicalPath).registerTempTable("p")
- withTempTable("p") {
- // let's do three overwrites for good measure
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
- checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
- }
+ test("INSERT OVERWRITE TABLE Parquet table") {
+ withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
+ withTempPath { file =>
+ sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
+ read.parquet(file.getCanonicalPath).registerTempTable("p")
+ withTempTable("p") {
+ // let's do three overwrites for good measure
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+ checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM t").collect().toSeq)
}
}
}
}
-
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
- run("Parquet data source enabled")
- }
-
- withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "false") {
- run("Parquet data source disabled")
- }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e403f32efa..4fdf774ead 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,10 +21,9 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.BeforeAndAfterAll
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark.Logging
import org.apache.spark.sql._
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -564,10 +563,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
}
test("scan a parquet table created through a CTAS statement") {
- withSQLConf(
- HiveContext.CONVERT_METASTORE_PARQUET.key -> "true",
- SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
-
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") {
withTempTable("jt") {
(1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
@@ -582,9 +578,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
Row(3) :: Row(4) :: Nil)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: ParquetRelation2) => // OK
+ case LogicalRelation(p: ParquetRelation) => // OK
case _ =>
- fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
+ fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 0342826542..ff42fdefaa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
case class Nested1(f1: Nested2)
@@ -61,7 +62,9 @@ class MyDialect extends DefaultParserDialect
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
* valid, but Hive currently cannot execute it.
*/
-class SQLQuerySuite extends QueryTest {
+class SQLQuerySuite extends QueryTest with SQLTestUtils {
+ override def sqlContext: SQLContext = TestHive
+
test("SPARK-6835: udtf in lateral view") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")
@@ -195,17 +198,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: ParquetRelation2) =>
+ case LogicalRelation(r: ParquetRelation) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${ParquetRelation2.getClass.getCanonicalName}.")
+ s"${ParquetRelation.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
+ s"${ParquetRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
@@ -350,33 +353,26 @@ class SQLQuerySuite extends QueryTest {
"serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
)
- val origUseParquetDataSource = conf.parquetUseDataSourceApi
- try {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- sql(
- """CREATE TABLE ctas5
- | STORED AS parquet AS
- | SELECT key, value
- | FROM src
- | ORDER BY key, value""".stripMargin).collect()
-
- checkExistence(sql("DESC EXTENDED ctas5"), true,
- "name:key", "type:string", "name:value", "ctas5",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
- "MANAGED_TABLE"
- )
-
- val default = convertMetastoreParquet
- // use the Hive SerDe for parquet tables
- sql("set spark.sql.hive.convertMetastoreParquet = false")
+ sql(
+ """CREATE TABLE ctas5
+ | STORED AS parquet AS
+ | SELECT key, value
+ | FROM src
+ | ORDER BY key, value""".stripMargin).collect()
+
+ checkExistence(sql("DESC EXTENDED ctas5"), true,
+ "name:key", "type:string", "name:value", "ctas5",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+ "MANAGED_TABLE"
+ )
+
+ // use the Hive SerDe for parquet tables
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
checkAnswer(
sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
- sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
- } finally {
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 82a8daf8b4..f56fb96c52 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,13 +22,13 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -57,7 +57,7 @@ case class ParquetDataWithKeyAndComplexTypes(
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
-class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
+class ParquetMetastoreSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -134,6 +134,19 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
""")
+ sql(
+ """
+ |create table test_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}
@@ -166,6 +179,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql("DROP TABLE normal_parquet")
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS jt_array")
+ sql("DROP TABLE IF EXISTS test_parquet")
setConf(HiveContext.CONVERT_METASTORE_PARQUET, false)
}
@@ -176,40 +190,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
- case _: ParquetTableScan => true
case _: PhysicalRDD => true
}.nonEmpty)
}
-}
-
-class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
-
- sql(
- """
- |create table test_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- sql("DROP TABLE IF EXISTS test_parquet")
-
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
test("scan an empty parquet table") {
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
@@ -292,10 +275,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: ParquetRelation2) => // OK
+ case LogicalRelation(_: ParquetRelation) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[ParquetRelation2].getCanonicalName}")
+ s"${classOf[ParquetRelation].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -316,9 +299,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -346,9 +329,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[ParquetRelation2].getCanonicalName} and " +
+ s"${classOf[ParquetRelation].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -379,17 +362,17 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: ParquetRelation2) => r
+ case r @ LogicalRelation(_: ParquetRelation) => r
}.size
}
sql("DROP TABLE ms_convert")
}
- def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
+ def collectParquetRelation(df: DataFrame): ParquetRelation = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
- case LogicalRelation(r: ParquetRelation2) => r
+ case LogicalRelation(r: ParquetRelation) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
@@ -439,7 +422,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
@@ -543,81 +526,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
}
}
-class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-
- test("MetastoreRelation in InsertIntoTable will not be converted") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.executedPlan match {
- case insert: execution.InsertIntoHiveTable => // OK
- case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
- s"However, found ${o.toString}.")
- }
-
- checkAnswer(
- sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
- sql("SELECT a FROM jt WHERE jt.a > 5").collect()
- )
-
- sql("DROP TABLE IF EXISTS test_insert_parquet")
- }
-
- // TODO: enable it after the fix of SPARK-5950.
- ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | int_array array<int>
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.executedPlan match {
- case insert: execution.InsertIntoHiveTable => // OK
- case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
- s"However, found ${o.toString}.")
- }
-
- checkAnswer(
- sql("SELECT int_array FROM test_insert_parquet"),
- sql("SELECT a FROM jt_array").collect()
- )
-
- sql("DROP TABLE IF EXISTS test_insert_parquet")
- }
-}
-
/**
* A suite of tests for the Parquet support through the data sources API.
*/
-class ParquetSourceSuiteBase extends ParquetPartitioningTest {
+class ParquetSourceSuite extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -712,20 +624,6 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
}
}
}
-}
-
-class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
test("values in arrays and maps stored in parquet are always nullable") {
val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
@@ -734,7 +632,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
val expectedSchema1 =
StructType(
StructField("m", mapType1, nullable = true) ::
- StructField("a", arrayType1, nullable = true) :: Nil)
+ StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
df.write.format("parquet").saveAsTable("alwaysNullable")
@@ -772,20 +670,6 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
}
}
-class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
- val originalConf = conf.parquetUseDataSourceApi
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
- }
-}
-
/**
* A collection of tests for parquet data with various forms of partitioning.
*/