diff options
author | Reynold Xin <rxin@databricks.com> | 2015-01-20 15:16:14 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-01-20 15:16:14 -0800 |
commit | d181c2a1fc40746947b97799b12e7dd8c213fa9c (patch) | |
tree | 5905b15884832311d5402767efe1279759245b08 /sql/hive | |
parent | bc20a52b34e826895d0dcc1d783c021ebd456ebd (diff) | |
download | spark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.tar.gz spark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.tar.bz2 spark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.zip |
[SPARK-5323][SQL] Remove Row's Seq inheritance.
Author: Reynold Xin <rxin@databricks.com>
Closes #4115 from rxin/row-seq and squashes the following commits:
e33abd8 [Reynold Xin] Fixed compilation error.
cceb650 [Reynold Xin] Python test fixes, and removal of WrapDynamic.
0334a52 [Reynold Xin] mkString.
9cdeb7d [Reynold Xin] Hive tests.
15681c2 [Reynold Xin] Fix more test cases.
ea9023a [Reynold Xin] Fixed a catalyst test.
c5e2cb5 [Reynold Xin] Minor patch up.
b9cab7c [Reynold Xin] [SPARK-5323][SQL] Remove Row's Seq inheritance.
Diffstat (limited to 'sql/hive')
14 files changed, 136 insertions, 102 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 10833c1132..3e26fe3675 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -368,10 +368,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { .mkString("\t") } case command: ExecutedCommand => - command.executeCollect().map(_.head.toString) + command.executeCollect().map(_(0).toString) case other => - val result: Seq[Seq[Any]] = other.executeCollect().toSeq + val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. @@ -395,7 +395,7 @@ private object HiveContext { protected[sql] def toHiveString(a: (Any, DataType)): String = a match { case (struct: Row, StructType(fields)) => - struct.zip(fields).map { + struct.toSeq.zip(fields).map { case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" }.mkString("{", ",", "}") case (seq: Seq[_], ArrayType(typ, _)) => @@ -418,7 +418,7 @@ private object HiveContext { /** Hive outputs fields of structs slightly differently than top level attributes. */ protected def toHiveStructString(a: (Any, DataType)): String = a match { case (struct: Row, StructType(fields)) => - struct.zip(fields).map { + struct.toSeq.zip(fields).map { case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" }.mkString("{", ",", "}") case (seq: Seq[_], ArrayType(typ, _)) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index eeabfdd857..82dba99900 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -348,7 +348,7 @@ private[hive] trait HiveInspectors { (o: Any) => { if (o != null) { val struct = soi.create() - (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach { + (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach { (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) } struct @@ -432,7 +432,7 @@ private[hive] trait HiveInspectors { } case x: SettableStructObjectInspector => val fieldRefs = x.getAllStructFieldRefs - val row = a.asInstanceOf[Seq[_]] + val row = a.asInstanceOf[Row] // 1. create the pojo (most likely) object val result = x.create() var i = 0 @@ -448,7 +448,7 @@ private[hive] trait HiveInspectors { result case x: StructObjectInspector => val fieldRefs = x.getAllStructFieldRefs - val row = a.asInstanceOf[Seq[_]] + val row = a.asInstanceOf[Row] val result = new java.util.ArrayList[AnyRef](fieldRefs.length) var i = 0 while (i < fieldRefs.length) { @@ -475,7 +475,7 @@ private[hive] trait HiveInspectors { } def wrap( - row: Seq[Any], + row: Row, inspectors: Seq[ObjectInspector], cache: Array[AnyRef]): Array[AnyRef] = { var i = 0 @@ -486,6 +486,18 @@ private[hive] trait HiveInspectors { cache } + def wrap( + row: Seq[Any], + inspectors: Seq[ObjectInspector], + cache: Array[AnyRef]): Array[AnyRef] = { + var i = 0 + while (i < inspectors.length) { + cache(i) = wrap(row(i), inspectors(i)) + i += 1 + } + cache + } + /** * @param dataType Catalyst data type * @return Hive java object inspector (recursively), not the Writable ObjectInspector diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index d898b876c3..76d2140372 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -360,7 +360,7 @@ private[hive] case class HiveUdafFunction( protected lazy val cached = new Array[AnyRef](exprs.length) def update(input: Row): Unit = { - val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray + val inputs = inputProjection(input) function.iterate(buffer, wrap(inputs, inspectors, cached)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index cc8bb3e172..aae175e426 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -209,7 +209,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { val dynamicPartPath = dynamicPartColNames - .zip(row.takeRight(dynamicPartColNames.length)) + .zip(row.toSeq.takeRight(dynamicPartColNames.length)) .map { case (col, rawVal) => val string = if (rawVal == null) null else String.valueOf(rawVal) s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala index f89c49d292..f320d732fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util._ * So, we duplicate this code here. */ class QueryTest extends PlanTest { + /** * Runs the plan and makes sure the answer contains all of the keywords, or the * none of keywords are listed in the answer @@ -56,17 +57,20 @@ class QueryTest extends PlanTest { * @param rdd the [[SchemaRDD]] to be executed * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ]. */ - protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = { - val convertedAnswer = expectedAnswer match { - case s: Seq[_] if s.isEmpty => s - case s: Seq[_] if s.head.isInstanceOf[Product] && - !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq) - case s: Seq[_] => s - case singleItem => Seq(Seq(singleItem)) + protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Seq[Row]): Unit = { + val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty + def prepareAnswer(answer: Seq[Row]): Seq[Row] = { + // Converts data to types that we can do equality comparison using Scala collections. + // For BigDecimal type, the Scala type has a better definition of equality test (similar to + // Java's java.math.BigDecimal.compareTo). + val converted: Seq[Row] = answer.map { s => + Row.fromSeq(s.toSeq.map { + case d: java.math.BigDecimal => BigDecimal(d) + case o => o + }) + } + if (!isSorted) converted.sortBy(_.toString) else converted } - - val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty - def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer val sparkAnswer = try rdd.collect().toSeq catch { case e: Exception => fail( @@ -74,11 +78,12 @@ class QueryTest extends PlanTest { |Exception thrown while executing query: |${rdd.queryExecution} |== Exception == - |${stackTraceToString(e)} + |$e + |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)} """.stripMargin) } - if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) { + if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) { fail(s""" |Results do not match for query: |${rdd.logicalPlan} @@ -88,11 +93,22 @@ class QueryTest extends PlanTest { |${rdd.queryExecution.executedPlan} |== Results == |${sideBySide( - s"== Correct Answer - ${convertedAnswer.size} ==" +: - prepareAnswer(convertedAnswer).map(_.toString), - s"== Spark Answer - ${sparkAnswer.size} ==" +: - prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} + s"== Correct Answer - ${expectedAnswer.size} ==" +: + prepareAnswer(expectedAnswer).map(_.toString), + s"== Spark Answer - ${sparkAnswer.size} ==" +: + prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")} """.stripMargin) } } + + protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Row): Unit = { + checkAnswer(rdd, Seq(expectedAnswer)) + } + + def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = { + test(sqlString) { + checkAnswer(sqlContext.sql(sqlString), expectedAnswer) + } + } + } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala index 4864607252..2d3ff68012 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala @@ -129,6 +129,12 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors { } } + def checkValues(row1: Seq[Any], row2: Row): Unit = { + row1.zip(row2.toSeq).map { + case (r1, r2) => checkValue(r1, r2) + } + } + def checkValue(v1: Any, v2: Any): Unit = { (v1, v2) match { case (r1: Decimal, r2: Decimal) => @@ -198,7 +204,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors { case (t, idx) => StructField(s"c_$idx", t) }) - checkValues(row, unwrap(wrap(row, toInspector(dt)), toInspector(dt)).asInstanceOf[Row]) + checkValues(row, unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[Row]) checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt))) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 7cfb875e05..0e6636d38e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -43,7 +43,7 @@ class InsertIntoHiveTableSuite extends QueryTest { // Make sure the table has also been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq + testData.collect().toSeq.map(Row.fromTuple) ) // Add more data. @@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest { // Make sure the table has been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq ++ testData.collect().toSeq + testData.toSchemaRDD.collect().toSeq ++ testData.toSchemaRDD.collect().toSeq ) // Now overwrite. @@ -61,7 +61,7 @@ class InsertIntoHiveTableSuite extends QueryTest { // Make sure the registered table has also been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq + testData.collect().toSeq.map(Row.fromTuple) ) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 53d8aa7739..7408c7ffd6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -155,7 +155,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM jsonTable"), - ("a", "b") :: Nil) + Row("a", "b")) FileUtils.deleteDirectory(tempDir) sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) @@ -164,14 +164,14 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // will show. checkAnswer( sql("SELECT * FROM jsonTable"), - ("a1", "b1") :: Nil) + Row("a1", "b1")) refreshTable("jsonTable") // Check that the refresh worked checkAnswer( sql("SELECT * FROM jsonTable"), - ("a1", "b1", "c1") :: Nil) + Row("a1", "b1", "c1")) FileUtils.deleteDirectory(tempDir) } @@ -191,7 +191,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM jsonTable"), - ("a", "b") :: Nil) + Row("a", "b")) FileUtils.deleteDirectory(tempDir) sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) @@ -210,7 +210,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // New table should reflect new schema. checkAnswer( sql("SELECT * FROM jsonTable"), - ("a", "b", "c") :: Nil) + Row("a", "b", "c")) FileUtils.deleteDirectory(tempDir) } @@ -253,6 +253,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |) """.stripMargin) - sql("DROP TABLE jsonTable").collect.foreach(println) + sql("DROP TABLE jsonTable").collect().foreach(println) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 0b4e76c9d3..6f07fd5a87 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag -import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.{Row, SQLConf, QueryTest} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -141,7 +141,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { before: () => Unit, after: () => Unit, query: String, - expectedAnswer: Seq[Any], + expectedAnswer: Seq[Row], ct: ClassTag[_]) = { before() @@ -183,7 +183,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { /** Tests for MetastoreRelation */ val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" - val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238")) + val metastoreAnswer = Seq.fill(4)(Row(238, "val_238", 238, "val_238")) mkTest( () => (), () => (), @@ -197,7 +197,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { val leftSemiJoinQuery = """SELECT * FROM src a |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin - val answer = (86, "val_86") :: Nil + val answer = Row(86, "val_86") var rdd = sql(leftSemiJoinQuery) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c14f0d24e0..df72be7746 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -226,7 +226,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Jdk version leads to different query output for double, so not use createQueryTest here test("division") { val res = sql("SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1").collect().head - Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res).foreach( x => + Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res.toSeq).foreach( x => assert(x._1 == x._2.asInstanceOf[Double])) } @@ -235,7 +235,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("Query expressed in SQL") { setConf("spark.sql.dialect", "sql") - assert(sql("SELECT 1").collect() === Array(Seq(1))) + assert(sql("SELECT 1").collect() === Array(Row(1))) setConf("spark.sql.dialect", "hiveql") } @@ -467,7 +467,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestData(2, "str2") :: Nil) testData.registerTempTable("REGisteredTABle") - assertResult(Array(Array(2, "str2"))) { + assertResult(Array(Row(2, "str2"))) { sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " + "WHERE TableAliaS.a > 1").collect() } @@ -553,12 +553,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Describe a table assertResult( Array( - Array("key", "int", null), - Array("value", "string", null), - Array("dt", "string", null), - Array("# Partition Information", "", ""), - Array("# col_name", "data_type", "comment"), - Array("dt", "string", null)) + Row("key", "int", null), + Row("value", "string", null), + Row("dt", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("dt", "string", null)) ) { sql("DESCRIBE test_describe_commands1") .select('col_name, 'data_type, 'comment) @@ -568,12 +568,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Describe a table with a fully qualified table name assertResult( Array( - Array("key", "int", null), - Array("value", "string", null), - Array("dt", "string", null), - Array("# Partition Information", "", ""), - Array("# col_name", "data_type", "comment"), - Array("dt", "string", null)) + Row("key", "int", null), + Row("value", "string", null), + Row("dt", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("dt", "string", null)) ) { sql("DESCRIBE default.test_describe_commands1") .select('col_name, 'data_type, 'comment) @@ -623,8 +623,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assertResult( Array( - Array("a", "IntegerType", null), - Array("b", "StringType", null)) + Row("a", "IntegerType", null), + Row("b", "StringType", null)) ) { sql("DESCRIBE test_describe_commands2") .select('col_name, 'data_type, 'comment) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 5dafcd6c0a..f2374a2152 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -64,7 +64,7 @@ class HiveUdfSuite extends QueryTest { test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") { checkAnswer( sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"), - 8 + Row(8) ) } @@ -115,7 +115,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'") checkAnswer( sql("SELECT testUDFIntegerToString(i) FROM integerTable"), //.collect(), - Seq(Seq("1"), Seq("2"))) + Seq(Row("1"), Row("2"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString") TestHive.reset() @@ -131,7 +131,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") checkAnswer( sql("SELECT testUDFListListInt(lli) FROM listListIntTable"), //.collect(), - Seq(Seq(0), Seq(2), Seq(13))) + Seq(Row(0), Row(2), Row(13))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt") TestHive.reset() @@ -146,7 +146,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") checkAnswer( sql("SELECT testUDFListString(l) FROM listStringTable"), //.collect(), - Seq(Seq("a,b,c"), Seq("d,e"))) + Seq(Row("a,b,c"), Row("d,e"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString") TestHive.reset() @@ -160,7 +160,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'") checkAnswer( sql("SELECT testStringStringUdf(\"hello\", s) FROM stringTable"), //.collect(), - Seq(Seq("hello world"), Seq("hello goodbye"))) + Seq(Row("hello world"), Row("hello goodbye"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUdf") TestHive.reset() @@ -177,7 +177,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") checkAnswer( sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"), //.collect(), - Seq(Seq("0, 0"), Seq("2, 2"), Seq("13, 13"))) + Seq(Row("0, 0"), Row("2, 2"), Row("13, 13"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") TestHive.reset() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d41eb9e870..f6bf2dbb5d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -41,7 +41,7 @@ class SQLQuerySuite extends QueryTest { } test("CTAS with serde") { - sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect + sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( """CREATE TABLE ctas2 | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" @@ -51,23 +51,23 @@ class SQLQuerySuite extends QueryTest { | AS | SELECT key, value | FROM src - | ORDER BY key, value""".stripMargin).collect + | ORDER BY key, value""".stripMargin).collect() sql( """CREATE TABLE ctas3 | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012' | STORED AS textfile AS | SELECT key, value | FROM src - | ORDER BY key, value""".stripMargin).collect + | ORDER BY key, value""".stripMargin).collect() // the table schema may like (key: integer, value: string) sql( """CREATE TABLE IF NOT EXISTS ctas4 AS - | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect + | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect() // do nothing cause the table ctas4 already existed. sql( """CREATE TABLE IF NOT EXISTS ctas4 AS - | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect + | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect() checkAnswer( sql("SELECT k, value FROM ctas1 ORDER BY k, value"), @@ -89,7 +89,7 @@ class SQLQuerySuite extends QueryTest { intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] { sql( """CREATE TABLE ctas4 AS - | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect + | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect() } checkAnswer( sql("SELECT key, value FROM ctas4 ORDER BY key, value"), @@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest { sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil).registerTempTable("nested") checkAnswer( sql("SELECT f1.f2.f3 FROM nested"), - 1) + Row(1)) checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"), Seq.empty[Row]) checkAnswer( @@ -233,7 +233,7 @@ class SQLQuerySuite extends QueryTest { | (s struct<innerStruct: struct<s1:string>, | innerArray:array<int>, | innerMap: map<string, int>>) - """.stripMargin).collect + """.stripMargin).collect() sql( """ @@ -243,7 +243,7 @@ class SQLQuerySuite extends QueryTest { checkAnswer( sql("SELECT * FROM nullValuesInInnerComplexTypes"), - Seq(Seq(Seq(null, null, null))) + Row(Row(null, null, null)) ) sql("DROP TABLE nullValuesInInnerComplexTypes") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 4bc14bad0a..581f666399 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -39,7 +39,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest { test("SELECT on Parquet table") { val data = (1 to 4).map(i => (i, s"val_$i")) withParquetTable(data, "t") { - checkAnswer(sql("SELECT * FROM t"), data) + checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 8bbb7f2fdb..79fd99d9f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -177,81 +177,81 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), - Seq.fill(10)((1, "part-1")) + Seq.fill(10)(Row(1, "part-1")) ) checkAnswer( sql(s"SELECT stringField, p FROM $table WHERE p = 1"), - Seq.fill(10)(("part-1", 1)) + Seq.fill(10)(Row("part-1", 1)) ) } test(s"project the partitioning column $table") { checkAnswer( sql(s"SELECT p, count(*) FROM $table group by p"), - (1, 10) :: - (2, 10) :: - (3, 10) :: - (4, 10) :: - (5, 10) :: - (6, 10) :: - (7, 10) :: - (8, 10) :: - (9, 10) :: - (10, 10) :: Nil + Row(1, 10) :: + Row(2, 10) :: + Row(3, 10) :: + Row(4, 10) :: + Row(5, 10) :: + Row(6, 10) :: + Row(7, 10) :: + Row(8, 10) :: + Row(9, 10) :: + Row(10, 10) :: Nil ) } test(s"project partitioning and non-partitioning columns $table") { checkAnswer( sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), - ("part-1", 1, 10) :: - ("part-2", 2, 10) :: - ("part-3", 3, 10) :: - ("part-4", 4, 10) :: - ("part-5", 5, 10) :: - ("part-6", 6, 10) :: - ("part-7", 7, 10) :: - ("part-8", 8, 10) :: - ("part-9", 9, 10) :: - ("part-10", 10, 10) :: Nil + Row("part-1", 1, 10) :: + Row("part-2", 2, 10) :: + Row("part-3", 3, 10) :: + Row("part-4", 4, 10) :: + Row("part-5", 5, 10) :: + Row("part-6", 6, 10) :: + Row("part-7", 7, 10) :: + Row("part-8", 8, 10) :: + Row("part-9", 9, 10) :: + Row("part-10", 10, 10) :: Nil ) } test(s"simple count $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table"), - 100) + Row(100)) } test(s"pruned count $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), - 10) + Row(10)) } test(s"non-existant partition $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"), - 0) + Row(0)) } test(s"multi-partition pruned count $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), - 30) + Row(30)) } test(s"non-partition predicates $table") { checkAnswer( sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), - 30) + Row(30)) } test(s"sum $table") { checkAnswer( sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), - 1 + 2 + 3) + Row(1 + 2 + 3)) } test(s"hive udfs $table") { @@ -266,6 +266,6 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll test("non-part select(*)") { checkAnswer( sql("SELECT COUNT(*) FROM normal_parquet"), - 10) + Row(10)) } } |