aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-17 23:36:20 -0800
committerReynold Xin <rxin@databricks.com>2015-02-17 23:36:20 -0800
commit61ab08549cb6fceb6de1b5c490c55a89d4bd28fa (patch)
tree172480f72bb84b685b39acd1e744c84a5dc42d40 /sql/hive
parent3912d332464dcd124c60b734724c34d9742466a4 (diff)
downloadspark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.gz
spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.bz2
spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.zip
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4670 from liancheng/df-cleanup and squashes the following commits: 3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala18
6 files changed, 47 insertions, 47 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 9fcb04ca23..d4b175fa44 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -37,7 +37,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._
val testData = TestHive.sparkContext.parallelize(
- (1 to 100).map(i => TestData(i, i.toString))).toDF
+ (1 to 100).map(i => TestData(i, i.toString))).toDF()
before {
// Since every we are doing tests for DDL statements,
@@ -65,7 +65,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
+ testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
)
// Now overwrite.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e5156ae821..0bd82773f3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
test("check change without refresh") {
val tempDir = File.createTempFile("sparksql", "json")
tempDir.delete()
- sparkContext.parallelize(("a", "b") :: Nil).toDF
+ sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql(
@@ -171,7 +171,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
- sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF
+ sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
// Schema is cached so the new column does not show. The updated values in existing columns
@@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
test("drop, change, recreate") {
val tempDir = File.createTempFile("sparksql", "json")
tempDir.delete()
- sparkContext.parallelize(("a", "b") :: Nil).toDF
+ sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql(
@@ -209,7 +209,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
- sparkContext.parallelize(("a", "b", "c") :: Nil).toDF
+ sparkContext.parallelize(("a", "b", "c") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql("DROP TABLE jsonTable")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6f07fd5a87..1e05a024b8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -127,11 +127,11 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
}
test("estimates the size of a test MetastoreRelation") {
- val rdd = sql("""SELECT * FROM src""")
- val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
+ val df = sql("""SELECT * FROM src""")
+ val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.statistics.sizeInBytes
}
- assert(sizes.size === 1, s"Size wrong for:\n ${rdd.queryExecution}")
+ assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(5812)),
s"expected exact size 5812 for test table 'src', got: ${sizes(0)}")
}
@@ -145,10 +145,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
ct: ClassTag[_]) = {
before()
- var rdd = sql(query)
+ var df = sql(query)
// Assert src has a size smaller than the threshold.
- val sizes = rdd.queryExecution.analyzed.collect {
+ val sizes = df.queryExecution.analyzed.collect {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold
@@ -157,21 +157,21 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.size === 1,
- s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
+ s"actual query plans do not contain broadcast join: ${df.queryExecution}")
- checkAnswer(rdd, expectedAnswer) // check correctness of output
+ checkAnswer(df, expectedAnswer) // check correctness of output
TestHive.conf.settings.synchronized {
val tmp = conf.autoBroadcastJoinThreshold
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
- rdd = sql(query)
- bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ df = sql(query)
+ bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
+ val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
assert(shj.size === 1,
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
@@ -199,10 +199,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
|left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin
val answer = Row(86, "val_86")
- var rdd = sql(leftSemiJoinQuery)
+ var df = sql(leftSemiJoinQuery)
// Assert src has a size smaller than the threshold.
- val sizes = rdd.queryExecution.analyzed.collect {
+ val sizes = df.queryExecution.analyzed.collect {
case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass
.isAssignableFrom(r.getClass) =>
r.statistics.sizeInBytes
@@ -213,25 +213,25 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = rdd.queryExecution.sparkPlan.collect {
+ var bhj = df.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.size === 1,
- s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
+ s"actual query plans do not contain broadcast join: ${df.queryExecution}")
- checkAnswer(rdd, answer) // check correctness of output
+ checkAnswer(df, answer) // check correctness of output
TestHive.conf.settings.synchronized {
val tmp = conf.autoBroadcastJoinThreshold
sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
- rdd = sql(leftSemiJoinQuery)
- bhj = rdd.queryExecution.sparkPlan.collect {
+ df = sql(leftSemiJoinQuery)
+ bhj = df.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = rdd.queryExecution.sparkPlan.collect {
+ val shj = df.queryExecution.sparkPlan.collect {
case j: LeftSemiJoinHash => j
}
assert(shj.size === 1,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 955f3f51cf..bb0a67dc03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -429,7 +429,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES
|('serialization.last.column.takes.rest'='true') FROM src;
""".stripMargin.replaceAll("\n", " "))
-
+
createQueryTest("LIKE",
"SELECT * FROM src WHERE value LIKE '%1%'")
@@ -567,7 +567,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(2, "str2") :: Nil)
- testData.toDF.registerTempTable("REGisteredTABle")
+ testData.toDF().registerTempTable("REGisteredTABle")
assertResult(Array(Row(2, "str2"))) {
sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " +
@@ -583,8 +583,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-1704: Explain commands as a DataFrame") {
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- val rdd = sql("explain select key, count(value) from src group by key")
- assert(isExplanation(rdd))
+ val df = sql("explain select key, count(value) from src group by key")
+ assert(isExplanation(df))
TestHive.reset()
}
@@ -592,7 +592,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") {
val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3))
.zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)}
- TestHive.sparkContext.parallelize(fixture).toDF.registerTempTable("having_test")
+ TestHive.sparkContext.parallelize(fixture).toDF().registerTempTable("having_test")
val results =
sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3")
.collect()
@@ -740,7 +740,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(1, "str2") :: Nil)
- testData.toDF.registerTempTable("test_describe_commands2")
+ testData.toDF().registerTempTable("test_describe_commands2")
assertResult(
Array(
@@ -900,8 +900,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
- sparkContext.makeRDD(Seq.empty[LogEntry]).toDF.registerTempTable("rawLogs")
- sparkContext.makeRDD(Seq.empty[LogFile]).toDF.registerTempTable("logFiles")
+ sparkContext.makeRDD(Seq.empty[LogEntry]).toDF().registerTempTable("rawLogs")
+ sparkContext.makeRDD(Seq.empty[LogFile]).toDF().registerTempTable("logFiles")
sql(
"""
@@ -979,8 +979,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
val KV = "([^=]+)=([^=]*)".r
- def collectResults(rdd: DataFrame): Set[(String, String)] =
- rdd.collect().map {
+ def collectResults(df: DataFrame): Set[(String, String)] =
+ df.collect().map {
case Row(key: String, value: String) => key -> value
case Row(KV(key, value)) => key -> value
}.toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 6fc4cc1426..f4440e5b78 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest {
test("case insensitivity with scala reflection") {
// Test resolution with Scala Reflection
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("caseSensitivityTest")
+ .toDF().registerTempTable("caseSensitivityTest")
val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"),
@@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
ignore("case insensitivity with scala reflection joins") {
// Test resolution with Scala Reflection
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("caseSensitivityTest")
+ .toDF().registerTempTable("caseSensitivityTest")
sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect()
}
test("nested repeated resolution") {
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("nestedRepeatedTest")
+ .toDF().registerTempTable("nestedRepeatedTest")
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 245161d2eb..cb405f56bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest {
| getStruct(1).f5 FROM src LIMIT 1
""".stripMargin).head() === Row(1, 2, 3, 4, 5))
}
-
+
test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") {
checkAnswer(
sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"),
@@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest {
test("SPARK-2693 udaf aggregates test") {
checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src").collect().toSeq)
-
+
checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"),
sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq)
}
@@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest {
test("Generic UDAF aggregates") {
checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999)) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)
-
+
checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"),
sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq)
}
-
+
test("UDFIntegerToString") {
val testData = TestHive.sparkContext.parallelize(
- IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF
+ IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
testData.registerTempTable("integerTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'")
@@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest {
val testData = TestHive.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
- ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF
+ ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
testData.registerTempTable("listListIntTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
@@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest {
test("UDFListString") {
val testData = TestHive.sparkContext.parallelize(
ListStringCaseClass(Seq("a", "b", "c")) ::
- ListStringCaseClass(Seq("d", "e")) :: Nil).toDF
+ ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
testData.registerTempTable("listStringTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
@@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest {
test("UDFStringString") {
val testData = TestHive.sparkContext.parallelize(
- StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF
+ StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
testData.registerTempTable("stringTable")
sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'")
@@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest {
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
- Nil).toDF
+ Nil).toDF()
testData.registerTempTable("TwoListTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")