aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-16 01:38:31 -0800
committerCheng Lian <lian@databricks.com>2015-02-16 01:38:31 -0800
commit3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56 (patch)
treea583c820c1cecd46fb021323d88ac3e50af01b98 /sql/hive
parent199a9e80275ac70582ea32f0f2f5a0a15b168785 (diff)
downloadspark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.tar.gz
spark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.tar.bz2
spark-3ce58cf9c0ffe8b867ca79b404fe3fa291cf0e56.zip
[SPARK-4553] [SPARK-5767] [SQL] Wires Parquet data source with the newly introduced write support for data source API
This PR migrates the Parquet data source to the new data source write support API. Now users can also overwriting and appending to existing tables. Notice that inserting into partitioned tables is not supported yet. When Parquet data source is enabled, insertion to Hive Metastore Parquet tables is also fullfilled by the Parquet data source. This is done by the newly introduced `HiveMetastoreCatalog.ParquetConversions` rule, which is a "proper" implementation of the original hacky `HiveStrategies.ParquetConversion`. The latter is still preserved, and can be removed together with the old Parquet support in the future. TODO: - [x] Update outdated comments in `newParquet.scala`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4563) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4563 from liancheng/parquet-refining and squashes the following commits: fa98d27 [Cheng Lian] Fixes test cases which should disable off Parquet data source 2476e82 [Cheng Lian] Fixes compilation error introduced during rebasing a83d290 [Cheng Lian] Passes Hive Metastore partitioning information to ParquetRelation2
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala127
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala247
4 files changed, 257 insertions, 150 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ddc7b181d4..87b380f950 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -22,26 +22,24 @@ import java.sql.Timestamp
import scala.collection.JavaConversions._
import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
-import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
-import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
-import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
+import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.sql.types._
/**
@@ -244,6 +242,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedRules =
+ catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index eb1ee54247..6d794d0e11 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,25 +20,25 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}
-import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
-
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.hive.metastore.{Warehouse, TableType}
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
+import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
+import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -101,16 +101,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
- /** *
- * Creates a data source table (a table created with USING clause) in Hive's metastore.
- * Returns true when the table has been created. Otherwise, false.
- * @param tableName
- * @param userSpecifiedSchema
- * @param provider
- * @param options
- * @param isExternal
- * @return
- */
+ /**
+ * Creates a data source table (a table created with USING clause) in Hive's metastore.
+ * Returns true when the table has been created. Otherwise, false.
+ */
def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
@@ -141,7 +135,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
def hiveDefaultTableFilePath(tableName: String): String = {
- val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase())
+ val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
hiveWarehouse.getTablePath(currentDatabase, tableName).toString
}
@@ -176,25 +170,41 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Nil
}
- val relation = MetastoreRelation(
- databaseName, tblName, alias)(
- table.getTTable, partitions.map(part => part.getTPartition))(hive)
-
- if (hive.convertMetastoreParquet &&
- hive.conf.parquetUseDataSourceApi &&
- relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) {
- val metastoreSchema = StructType.fromAttributes(relation.output)
- val paths = if (relation.hiveQlTable.isPartitioned) {
- relation.hiveQlPartitions.map(p => p.getLocation)
- } else {
- Seq(relation.hiveQlTable.getDataLocation.toString)
- }
+ MetastoreRelation(databaseName, tblName, alias)(
+ table.getTTable, partitions.map(part => part.getTPartition))(hive)
+ }
+ }
- LogicalRelation(ParquetRelation2(
- paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive))
- } else {
- relation
+ private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
+ val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+
+ // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
+ // serialize the Metastore schema to JSON and pass it as a data source option because of the
+ // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
+ if (metastoreRelation.hiveQlTable.isPartitioned) {
+ val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
+ val partitionColumnDataTypes = partitionSchema.map(_.dataType)
+ val partitions = metastoreRelation.hiveQlPartitions.map { p =>
+ val location = p.getLocation
+ val values = Row.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
+ case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
+ })
+ ParquetPartition(values, location)
}
+ val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ val paths = partitions.map(_.path)
+ LogicalRelation(
+ ParquetRelation2(
+ paths,
+ Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
+ None,
+ Some(partitionSpec))(hive))
+ } else {
+ val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+ LogicalRelation(
+ ParquetRelation2(
+ paths,
+ Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive))
}
}
@@ -261,9 +271,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())
- import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.io.Text
+ import org.apache.hadoop.mapred.TextInputFormat
tbl.setInputFormatClass(classOf[TextInputFormat])
tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
@@ -386,12 +396,55 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
/**
+ * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
+ * data source relations for better performance.
+ *
+ * This rule can be considered as [[HiveStrategies.ParquetConversion]] done right.
+ */
+ object ParquetConversions extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Collects all `MetastoreRelation`s which should be replaced
+ val toBeReplaced = plan.collect {
+ // Write path
+ case InsertIntoTable(relation: MetastoreRelation, _, _, _)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if !relation.hiveQlTable.isPartitioned &&
+ hive.convertMetastoreParquet &&
+ hive.conf.parquetUseDataSourceApi &&
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ relation
+
+ // Read path
+ case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
+ if hive.convertMetastoreParquet &&
+ hive.conf.parquetUseDataSourceApi &&
+ relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+ relation
+ }
+
+ // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
+ // attribute IDs referenced in other nodes.
+ toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
+ val parquetRelation = convertToParquetRelation(relation)
+ val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
+
+ lastPlan.transformUp {
+ case r: MetastoreRelation if r == relation => parquetRelation
+ case other => other.transformExpressions {
+ case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import org.apache.hadoop.hive.ql.Context
- import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+ import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index cb138be90e..965d159656 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -139,15 +139,19 @@ private[hive] trait HiveStrategies {
val partitionLocations = partitions.map(_.getLocation)
- hiveContext
- .parquetFile(partitionLocations.head, partitionLocations.tail: _*)
- .addPartitioningAttributes(relation.partitionKeys)
- .lowerCase
- .where(unresolvedOtherPredicates)
- .select(unresolvedProjection: _*)
- .queryExecution
- .executedPlan
- .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ if (partitionLocations.isEmpty) {
+ PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
+ } else {
+ hiveContext
+ .parquetFile(partitionLocations.head, partitionLocations.tail: _*)
+ .addPartitioningAttributes(relation.partitionKeys)
+ .lowerCase
+ .where(unresolvedOtherPredicates)
+ .select(unresolvedProjection: _*)
+ .queryExecution
+ .executedPlan
+ .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+ }
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.toString)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index e246cbb6d7..2acf1a7767 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -40,7 +40,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
-class ParquetMetastoreSuite extends ParquetPartitioningTest {
+class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -97,6 +97,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
override def afterAll(): Unit = {
+ sql("DROP TABLE partitioned_parquet")
+ sql("DROP TABLE partitioned_parquet_with_key")
+ sql("DROP TABLE normal_parquet")
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}
@@ -113,10 +116,38 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}
+class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
+ val originalConf = conf.parquetUseDataSourceApi
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
+class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
+ val originalConf = conf.parquetUseDataSourceApi
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
/**
* A suite of tests for the Parquet support through the data sources API.
*/
-class ParquetSourceSuite extends ParquetPartitioningTest {
+class ParquetSourceSuiteBase extends ParquetPartitioningTest {
override def beforeAll(): Unit = {
super.beforeAll()
@@ -146,6 +177,34 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
}
}
+class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
+ val originalConf = conf.parquetUseDataSourceApi
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
+class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
+ val originalConf = conf.parquetUseDataSourceApi
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ }
+}
+
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
@@ -191,107 +250,99 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
}
}
- def run(prefix: String): Unit = {
- Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
- test(s"$prefix: ordering of the partitioning columns $table") {
- checkAnswer(
- sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
- Seq.fill(10)(Row(1, "part-1"))
- )
-
- checkAnswer(
- sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
- Seq.fill(10)(Row("part-1", 1))
- )
- }
-
- test(s"$prefix: project the partitioning column $table") {
- checkAnswer(
- sql(s"SELECT p, count(*) FROM $table group by p"),
- Row(1, 10) ::
- Row(2, 10) ::
- Row(3, 10) ::
- Row(4, 10) ::
- Row(5, 10) ::
- Row(6, 10) ::
- Row(7, 10) ::
- Row(8, 10) ::
- Row(9, 10) ::
- Row(10, 10) :: Nil
- )
- }
-
- test(s"$prefix: project partitioning and non-partitioning columns $table") {
- checkAnswer(
- sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
- Row("part-1", 1, 10) ::
- Row("part-2", 2, 10) ::
- Row("part-3", 3, 10) ::
- Row("part-4", 4, 10) ::
- Row("part-5", 5, 10) ::
- Row("part-6", 6, 10) ::
- Row("part-7", 7, 10) ::
- Row("part-8", 8, 10) ::
- Row("part-9", 9, 10) ::
- Row("part-10", 10, 10) :: Nil
- )
- }
-
- test(s"$prefix: simple count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table"),
- Row(100))
- }
-
- test(s"$prefix: pruned count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
- Row(10))
- }
-
- test(s"$prefix: non-existent partition $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
- Row(0))
- }
-
- test(s"$prefix: multi-partition pruned count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
- Row(30))
- }
-
- test(s"$prefix: non-partition predicates $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
- Row(30))
- }
-
- test(s"$prefix: sum $table") {
- checkAnswer(
- sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
- Row(1 + 2 + 3))
- }
-
- test(s"$prefix: hive udfs $table") {
- checkAnswer(
- sql(s"SELECT concat(stringField, stringField) FROM $table"),
- sql(s"SELECT stringField FROM $table").map {
- case Row(s: String) => Row(s + s)
- }.collect().toSeq)
- }
+ Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+ test(s"ordering of the partitioning columns $table") {
+ checkAnswer(
+ sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
+ Seq.fill(10)(Row(1, "part-1"))
+ )
+
+ checkAnswer(
+ sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
+ Seq.fill(10)(Row("part-1", 1))
+ )
+ }
+
+ test(s"project the partitioning column $table") {
+ checkAnswer(
+ sql(s"SELECT p, count(*) FROM $table group by p"),
+ Row(1, 10) ::
+ Row(2, 10) ::
+ Row(3, 10) ::
+ Row(4, 10) ::
+ Row(5, 10) ::
+ Row(6, 10) ::
+ Row(7, 10) ::
+ Row(8, 10) ::
+ Row(9, 10) ::
+ Row(10, 10) :: Nil
+ )
+ }
+
+ test(s"project partitioning and non-partitioning columns $table") {
+ checkAnswer(
+ sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
+ Row("part-1", 1, 10) ::
+ Row("part-2", 2, 10) ::
+ Row("part-3", 3, 10) ::
+ Row("part-4", 4, 10) ::
+ Row("part-5", 5, 10) ::
+ Row("part-6", 6, 10) ::
+ Row("part-7", 7, 10) ::
+ Row("part-8", 8, 10) ::
+ Row("part-9", 9, 10) ::
+ Row("part-10", 10, 10) :: Nil
+ )
+ }
+
+ test(s"simple count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table"),
+ Row(100))
}
- test(s"$prefix: $prefix: non-part select(*)") {
+ test(s"pruned count $table") {
checkAnswer(
- sql("SELECT COUNT(*) FROM normal_parquet"),
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
Row(10))
}
- }
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
- run("Parquet data source enabled")
+ test(s"non-existent partition $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+ Row(0))
+ }
+
+ test(s"multi-partition pruned count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+ Row(30))
+ }
+
+ test(s"non-partition predicates $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+ Row(30))
+ }
- setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
- run("Parquet data source disabled")
+ test(s"sum $table") {
+ checkAnswer(
+ sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
+ Row(1 + 2 + 3))
+ }
+
+ test(s"hive udfs $table") {
+ checkAnswer(
+ sql(s"SELECT concat(stringField, stringField) FROM $table"),
+ sql(s"SELECT stringField FROM $table").map {
+ case Row(s: String) => Row(s + s)
+ }.collect().toSeq)
+ }
+ }
+
+ test("non-part select(*)") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM normal_parquet"),
+ Row(10))
+ }
}