aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-20 15:16:14 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-20 15:16:14 -0800
commitd181c2a1fc40746947b97799b12e7dd8c213fa9c (patch)
tree5905b15884832311d5402767efe1279759245b08 /sql/hive/src
parentbc20a52b34e826895d0dcc1d783c021ebd456ebd (diff)
downloadspark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.tar.gz
spark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.tar.bz2
spark-d181c2a1fc40746947b97799b12e7dd8c213fa9c.zip
[SPARK-5323][SQL] Remove Row's Seq inheritance.
Author: Reynold Xin <rxin@databricks.com> Closes #4115 from rxin/row-seq and squashes the following commits: e33abd8 [Reynold Xin] Fixed compilation error. cceb650 [Reynold Xin] Python test fixes, and removal of WrapDynamic. 0334a52 [Reynold Xin] mkString. 9cdeb7d [Reynold Xin] Hive tests. 15681c2 [Reynold Xin] Fix more test cases. ea9023a [Reynold Xin] Fixed a catalyst test. c5e2cb5 [Reynold Xin] Minor patch up. b9cab7c [Reynold Xin] [SPARK-5323][SQL] Remove Row's Seq inheritance.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala48
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala34
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala58
14 files changed, 136 insertions, 102 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 10833c1132..3e26fe3675 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -368,10 +368,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
.mkString("\t")
}
case command: ExecutedCommand =>
- command.executeCollect().map(_.head.toString)
+ command.executeCollect().map(_(0).toString)
case other =>
- val result: Seq[Seq[Any]] = other.executeCollect().toSeq
+ val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
@@ -395,7 +395,7 @@ private object HiveContext {
protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
case (struct: Row, StructType(fields)) =>
- struct.zip(fields).map {
+ struct.toSeq.zip(fields).map {
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
@@ -418,7 +418,7 @@ private object HiveContext {
/** Hive outputs fields of structs slightly differently than top level attributes. */
protected def toHiveStructString(a: (Any, DataType)): String = a match {
case (struct: Row, StructType(fields)) =>
- struct.zip(fields).map {
+ struct.toSeq.zip(fields).map {
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index eeabfdd857..82dba99900 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -348,7 +348,7 @@ private[hive] trait HiveInspectors {
(o: Any) => {
if (o != null) {
val struct = soi.create()
- (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
+ (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
@@ -432,7 +432,7 @@ private[hive] trait HiveInspectors {
}
case x: SettableStructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
- val row = a.asInstanceOf[Seq[_]]
+ val row = a.asInstanceOf[Row]
// 1. create the pojo (most likely) object
val result = x.create()
var i = 0
@@ -448,7 +448,7 @@ private[hive] trait HiveInspectors {
result
case x: StructObjectInspector =>
val fieldRefs = x.getAllStructFieldRefs
- val row = a.asInstanceOf[Seq[_]]
+ val row = a.asInstanceOf[Row]
val result = new java.util.ArrayList[AnyRef](fieldRefs.length)
var i = 0
while (i < fieldRefs.length) {
@@ -475,7 +475,7 @@ private[hive] trait HiveInspectors {
}
def wrap(
- row: Seq[Any],
+ row: Row,
inspectors: Seq[ObjectInspector],
cache: Array[AnyRef]): Array[AnyRef] = {
var i = 0
@@ -486,6 +486,18 @@ private[hive] trait HiveInspectors {
cache
}
+ def wrap(
+ row: Seq[Any],
+ inspectors: Seq[ObjectInspector],
+ cache: Array[AnyRef]): Array[AnyRef] = {
+ var i = 0
+ while (i < inspectors.length) {
+ cache(i) = wrap(row(i), inspectors(i))
+ i += 1
+ }
+ cache
+ }
+
/**
* @param dataType Catalyst data type
* @return Hive java object inspector (recursively), not the Writable ObjectInspector
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d898b876c3..76d2140372 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -360,7 +360,7 @@ private[hive] case class HiveUdafFunction(
protected lazy val cached = new Array[AnyRef](exprs.length)
def update(input: Row): Unit = {
- val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
+ val inputs = inputProjection(input)
function.iterate(buffer, wrap(inputs, inspectors, cached))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index cc8bb3e172..aae175e426 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -209,7 +209,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames
- .zip(row.takeRight(dynamicPartColNames.length))
+ .zip(row.toSeq.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = if (rawVal == null) null else String.valueOf(rawVal)
s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}"
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f89c49d292..f320d732fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util._
* So, we duplicate this code here.
*/
class QueryTest extends PlanTest {
+
/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
* none of keywords are listed in the answer
@@ -56,17 +57,20 @@ class QueryTest extends PlanTest {
* @param rdd the [[SchemaRDD]] to be executed
* @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
*/
- protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
- val convertedAnswer = expectedAnswer match {
- case s: Seq[_] if s.isEmpty => s
- case s: Seq[_] if s.head.isInstanceOf[Product] &&
- !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq)
- case s: Seq[_] => s
- case singleItem => Seq(Seq(singleItem))
+ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Seq[Row]): Unit = {
+ val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+ def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
+ // Converts data to types that we can do equality comparison using Scala collections.
+ // For BigDecimal type, the Scala type has a better definition of equality test (similar to
+ // Java's java.math.BigDecimal.compareTo).
+ val converted: Seq[Row] = answer.map { s =>
+ Row.fromSeq(s.toSeq.map {
+ case d: java.math.BigDecimal => BigDecimal(d)
+ case o => o
+ })
+ }
+ if (!isSorted) converted.sortBy(_.toString) else converted
}
-
- val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty
- def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer
val sparkAnswer = try rdd.collect().toSeq catch {
case e: Exception =>
fail(
@@ -74,11 +78,12 @@ class QueryTest extends PlanTest {
|Exception thrown while executing query:
|${rdd.queryExecution}
|== Exception ==
- |${stackTraceToString(e)}
+ |$e
+ |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
""".stripMargin)
}
- if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
+ if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
fail(s"""
|Results do not match for query:
|${rdd.logicalPlan}
@@ -88,11 +93,22 @@ class QueryTest extends PlanTest {
|${rdd.queryExecution.executedPlan}
|== Results ==
|${sideBySide(
- s"== Correct Answer - ${convertedAnswer.size} ==" +:
- prepareAnswer(convertedAnswer).map(_.toString),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ prepareAnswer(expectedAnswer).map(_.toString),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
""".stripMargin)
}
}
+
+ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Row): Unit = {
+ checkAnswer(rdd, Seq(expectedAnswer))
+ }
+
+ def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
+ test(sqlString) {
+ checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+ }
+ }
+
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index 4864607252..2d3ff68012 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -129,6 +129,12 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
}
}
+ def checkValues(row1: Seq[Any], row2: Row): Unit = {
+ row1.zip(row2.toSeq).map {
+ case (r1, r2) => checkValue(r1, r2)
+ }
+ }
+
def checkValue(v1: Any, v2: Any): Unit = {
(v1, v2) match {
case (r1: Decimal, r2: Decimal) =>
@@ -198,7 +204,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
case (t, idx) => StructField(s"c_$idx", t)
})
- checkValues(row, unwrap(wrap(row, toInspector(dt)), toInspector(dt)).asInstanceOf[Row])
+ checkValues(row, unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[Row])
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 7cfb875e05..0e6636d38e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -43,7 +43,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the table has also been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
+ testData.collect().toSeq.map(Row.fromTuple)
)
// Add more data.
@@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq ++ testData.collect().toSeq
+ testData.toSchemaRDD.collect().toSeq ++ testData.toSchemaRDD.collect().toSeq
)
// Now overwrite.
@@ -61,7 +61,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the registered table has also been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
+ testData.collect().toSeq.map(Row.fromTuple)
)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 53d8aa7739..7408c7ffd6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -155,7 +155,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
- ("a", "b") :: Nil)
+ Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
@@ -164,14 +164,14 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// will show.
checkAnswer(
sql("SELECT * FROM jsonTable"),
- ("a1", "b1") :: Nil)
+ Row("a1", "b1"))
refreshTable("jsonTable")
// Check that the refresh worked
checkAnswer(
sql("SELECT * FROM jsonTable"),
- ("a1", "b1", "c1") :: Nil)
+ Row("a1", "b1", "c1"))
FileUtils.deleteDirectory(tempDir)
}
@@ -191,7 +191,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
- ("a", "b") :: Nil)
+ Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
@@ -210,7 +210,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// New table should reflect new schema.
checkAnswer(
sql("SELECT * FROM jsonTable"),
- ("a", "b", "c") :: Nil)
+ Row("a", "b", "c"))
FileUtils.deleteDirectory(tempDir)
}
@@ -253,6 +253,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
|)
""".stripMargin)
- sql("DROP TABLE jsonTable").collect.foreach(println)
+ sql("DROP TABLE jsonTable").collect().foreach(println)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 0b4e76c9d3..6f07fd5a87 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll
import scala.reflect.ClassTag
-import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.{Row, SQLConf, QueryTest}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
@@ -141,7 +141,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
before: () => Unit,
after: () => Unit,
query: String,
- expectedAnswer: Seq[Any],
+ expectedAnswer: Seq[Row],
ct: ClassTag[_]) = {
before()
@@ -183,7 +183,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
/** Tests for MetastoreRelation */
val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key"""
- val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238"))
+ val metastoreAnswer = Seq.fill(4)(Row(238, "val_238", 238, "val_238"))
mkTest(
() => (),
() => (),
@@ -197,7 +197,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
val leftSemiJoinQuery =
"""SELECT * FROM src a
|left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin
- val answer = (86, "val_86") :: Nil
+ val answer = Row(86, "val_86")
var rdd = sql(leftSemiJoinQuery)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index c14f0d24e0..df72be7746 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -226,7 +226,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
// Jdk version leads to different query output for double, so not use createQueryTest here
test("division") {
val res = sql("SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1").collect().head
- Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res).foreach( x =>
+ Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res.toSeq).foreach( x =>
assert(x._1 == x._2.asInstanceOf[Double]))
}
@@ -235,7 +235,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("Query expressed in SQL") {
setConf("spark.sql.dialect", "sql")
- assert(sql("SELECT 1").collect() === Array(Seq(1)))
+ assert(sql("SELECT 1").collect() === Array(Row(1)))
setConf("spark.sql.dialect", "hiveql")
}
@@ -467,7 +467,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestData(2, "str2") :: Nil)
testData.registerTempTable("REGisteredTABle")
- assertResult(Array(Array(2, "str2"))) {
+ assertResult(Array(Row(2, "str2"))) {
sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " +
"WHERE TableAliaS.a > 1").collect()
}
@@ -553,12 +553,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
// Describe a table
assertResult(
Array(
- Array("key", "int", null),
- Array("value", "string", null),
- Array("dt", "string", null),
- Array("# Partition Information", "", ""),
- Array("# col_name", "data_type", "comment"),
- Array("dt", "string", null))
+ Row("key", "int", null),
+ Row("value", "string", null),
+ Row("dt", "string", null),
+ Row("# Partition Information", "", ""),
+ Row("# col_name", "data_type", "comment"),
+ Row("dt", "string", null))
) {
sql("DESCRIBE test_describe_commands1")
.select('col_name, 'data_type, 'comment)
@@ -568,12 +568,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
// Describe a table with a fully qualified table name
assertResult(
Array(
- Array("key", "int", null),
- Array("value", "string", null),
- Array("dt", "string", null),
- Array("# Partition Information", "", ""),
- Array("# col_name", "data_type", "comment"),
- Array("dt", "string", null))
+ Row("key", "int", null),
+ Row("value", "string", null),
+ Row("dt", "string", null),
+ Row("# Partition Information", "", ""),
+ Row("# col_name", "data_type", "comment"),
+ Row("dt", "string", null))
) {
sql("DESCRIBE default.test_describe_commands1")
.select('col_name, 'data_type, 'comment)
@@ -623,8 +623,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assertResult(
Array(
- Array("a", "IntegerType", null),
- Array("b", "StringType", null))
+ Row("a", "IntegerType", null),
+ Row("b", "StringType", null))
) {
sql("DESCRIBE test_describe_commands2")
.select('col_name, 'data_type, 'comment)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 5dafcd6c0a..f2374a2152 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -64,7 +64,7 @@ class HiveUdfSuite extends QueryTest {
test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") {
checkAnswer(
sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"),
- 8
+ Row(8)
)
}
@@ -115,7 +115,7 @@ class HiveUdfSuite extends QueryTest {
sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'")
checkAnswer(
sql("SELECT testUDFIntegerToString(i) FROM integerTable"), //.collect(),
- Seq(Seq("1"), Seq("2")))
+ Seq(Row("1"), Row("2")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString")
TestHive.reset()
@@ -131,7 +131,7 @@ class HiveUdfSuite extends QueryTest {
sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
checkAnswer(
sql("SELECT testUDFListListInt(lli) FROM listListIntTable"), //.collect(),
- Seq(Seq(0), Seq(2), Seq(13)))
+ Seq(Row(0), Row(2), Row(13)))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt")
TestHive.reset()
@@ -146,7 +146,7 @@ class HiveUdfSuite extends QueryTest {
sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
checkAnswer(
sql("SELECT testUDFListString(l) FROM listStringTable"), //.collect(),
- Seq(Seq("a,b,c"), Seq("d,e")))
+ Seq(Row("a,b,c"), Row("d,e")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString")
TestHive.reset()
@@ -160,7 +160,7 @@ class HiveUdfSuite extends QueryTest {
sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'")
checkAnswer(
sql("SELECT testStringStringUdf(\"hello\", s) FROM stringTable"), //.collect(),
- Seq(Seq("hello world"), Seq("hello goodbye")))
+ Seq(Row("hello world"), Row("hello goodbye")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUdf")
TestHive.reset()
@@ -177,7 +177,7 @@ class HiveUdfSuite extends QueryTest {
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
checkAnswer(
sql("SELECT testUDFTwoListList(lli, lli) FROM TwoListTable"), //.collect(),
- Seq(Seq("0, 0"), Seq("2, 2"), Seq("13, 13")))
+ Seq(Row("0, 0"), Row("2, 2"), Row("13, 13")))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
TestHive.reset()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d41eb9e870..f6bf2dbb5d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -41,7 +41,7 @@ class SQLQuerySuite extends QueryTest {
}
test("CTAS with serde") {
- sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
+ sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
"""CREATE TABLE ctas2
| ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
@@ -51,23 +51,23 @@ class SQLQuerySuite extends QueryTest {
| AS
| SELECT key, value
| FROM src
- | ORDER BY key, value""".stripMargin).collect
+ | ORDER BY key, value""".stripMargin).collect()
sql(
"""CREATE TABLE ctas3
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012'
| STORED AS textfile AS
| SELECT key, value
| FROM src
- | ORDER BY key, value""".stripMargin).collect
+ | ORDER BY key, value""".stripMargin).collect()
// the table schema may like (key: integer, value: string)
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
- | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
+ | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect()
// do nothing cause the table ctas4 already existed.
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
- | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+ | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect()
checkAnswer(
sql("SELECT k, value FROM ctas1 ORDER BY k, value"),
@@ -89,7 +89,7 @@ class SQLQuerySuite extends QueryTest {
intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
sql(
"""CREATE TABLE ctas4 AS
- | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+ | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect()
}
checkAnswer(
sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
@@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest {
sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil).registerTempTable("nested")
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
- 1)
+ Row(1))
checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
Seq.empty[Row])
checkAnswer(
@@ -233,7 +233,7 @@ class SQLQuerySuite extends QueryTest {
| (s struct<innerStruct: struct<s1:string>,
| innerArray:array<int>,
| innerMap: map<string, int>>)
- """.stripMargin).collect
+ """.stripMargin).collect()
sql(
"""
@@ -243,7 +243,7 @@ class SQLQuerySuite extends QueryTest {
checkAnswer(
sql("SELECT * FROM nullValuesInInnerComplexTypes"),
- Seq(Seq(Seq(null, null, null)))
+ Row(Row(null, null, null))
)
sql("DROP TABLE nullValuesInInnerComplexTypes")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 4bc14bad0a..581f666399 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -39,7 +39,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
test("SELECT on Parquet table") {
val data = (1 to 4).map(i => (i, s"val_$i"))
withParquetTable(data, "t") {
- checkAnswer(sql("SELECT * FROM t"), data)
+ checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 8bbb7f2fdb..79fd99d9f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -177,81 +177,81 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
- Seq.fill(10)((1, "part-1"))
+ Seq.fill(10)(Row(1, "part-1"))
)
checkAnswer(
sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
- Seq.fill(10)(("part-1", 1))
+ Seq.fill(10)(Row("part-1", 1))
)
}
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
- (1, 10) ::
- (2, 10) ::
- (3, 10) ::
- (4, 10) ::
- (5, 10) ::
- (6, 10) ::
- (7, 10) ::
- (8, 10) ::
- (9, 10) ::
- (10, 10) :: Nil
+ Row(1, 10) ::
+ Row(2, 10) ::
+ Row(3, 10) ::
+ Row(4, 10) ::
+ Row(5, 10) ::
+ Row(6, 10) ::
+ Row(7, 10) ::
+ Row(8, 10) ::
+ Row(9, 10) ::
+ Row(10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
- ("part-1", 1, 10) ::
- ("part-2", 2, 10) ::
- ("part-3", 3, 10) ::
- ("part-4", 4, 10) ::
- ("part-5", 5, 10) ::
- ("part-6", 6, 10) ::
- ("part-7", 7, 10) ::
- ("part-8", 8, 10) ::
- ("part-9", 9, 10) ::
- ("part-10", 10, 10) :: Nil
+ Row("part-1", 1, 10) ::
+ Row("part-2", 2, 10) ::
+ Row("part-3", 3, 10) ::
+ Row("part-4", 4, 10) ::
+ Row("part-5", 5, 10) ::
+ Row("part-6", 6, 10) ::
+ Row("part-7", 7, 10) ::
+ Row("part-8", 8, 10) ::
+ Row("part-9", 9, 10) ::
+ Row("part-10", 10, 10) :: Nil
)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
- 100)
+ Row(100))
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
- 10)
+ Row(10))
}
test(s"non-existant partition $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
- 0)
+ Row(0))
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
- 30)
+ Row(30))
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
- 30)
+ Row(30))
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
- 1 + 2 + 3)
+ Row(1 + 2 + 3))
}
test(s"hive udfs $table") {
@@ -266,6 +266,6 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
- 10)
+ Row(10))
}
}