diff options
author | Cheng Lian <lian@databricks.com> | 2015-02-17 23:36:20 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-17 23:36:20 -0800 |
commit | 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa (patch) | |
tree | 172480f72bb84b685b39acd1e744c84a5dc42d40 /sql/hive | |
parent | 3912d332464dcd124c60b734724c34d9742466a4 (diff) | |
download | spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.gz spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.bz2 spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.zip |
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #4670 from liancheng/df-cleanup and squashes the following commits:
3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls
Diffstat (limited to 'sql/hive')
6 files changed, 47 insertions, 47 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 9fcb04ca23..d4b175fa44 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -37,7 +37,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ val testData = TestHive.sparkContext.parallelize( - (1 to 100).map(i => TestData(i, i.toString))).toDF + (1 to 100).map(i => TestData(i, i.toString))).toDF() before { // Since every we are doing tests for DDL statements, @@ -65,7 +65,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { // Make sure the table has been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq + testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq ) // Now overwrite. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e5156ae821..0bd82773f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("check change without refresh") { val tempDir = File.createTempFile("sparksql", "json") tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF + sparkContext.parallelize(("a", "b") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql( @@ -171,7 +171,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Row("a", "b")) FileUtils.deleteDirectory(tempDir) - sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF + sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) // Schema is cached so the new column does not show. The updated values in existing columns @@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("drop, change, recreate") { val tempDir = File.createTempFile("sparksql", "json") tempDir.delete() - sparkContext.parallelize(("a", "b") :: Nil).toDF + sparkContext.parallelize(("a", "b") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql( @@ -209,7 +209,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Row("a", "b")) FileUtils.deleteDirectory(tempDir) - sparkContext.parallelize(("a", "b", "c") :: Nil).toDF + sparkContext.parallelize(("a", "b", "c") :: Nil).toDF() .toJSON.saveAsTextFile(tempDir.getCanonicalPath) sql("DROP TABLE jsonTable") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 6f07fd5a87..1e05a024b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -127,11 +127,11 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } test("estimates the size of a test MetastoreRelation") { - val rdd = sql("""SELECT * FROM src""") - val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + val df = sql("""SELECT * FROM src""") + val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation => mr.statistics.sizeInBytes } - assert(sizes.size === 1, s"Size wrong for:\n ${rdd.queryExecution}") + assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), s"expected exact size 5812 for test table 'src', got: ${sizes(0)}") } @@ -145,10 +145,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { ct: ClassTag[_]) = { before() - var rdd = sql(query) + var df = sql(query) // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { + val sizes = df.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold @@ -157,21 +157,21 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(rdd, expectedAnswer) // check correctness of output + checkAnswer(df, expectedAnswer) // check correctness of output TestHive.conf.settings.synchronized { val tmp = conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") - rdd = sql(query) - bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + df = sql(query) + bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } + val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j } assert(shj.size === 1, "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") @@ -199,10 +199,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin val answer = Row(86, "val_86") - var rdd = sql(leftSemiJoinQuery) + var df = sql(leftSemiJoinQuery) // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { + val sizes = df.queryExecution.analyzed.collect { case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes @@ -213,25 +213,25 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { + var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastLeftSemiJoinHash => j } assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(rdd, answer) // check correctness of output + checkAnswer(df, answer) // check correctness of output TestHive.conf.settings.synchronized { val tmp = conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") - rdd = sql(leftSemiJoinQuery) - bhj = rdd.queryExecution.sparkPlan.collect { + df = sql(leftSemiJoinQuery) + bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastLeftSemiJoinHash => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = rdd.queryExecution.sparkPlan.collect { + val shj = df.queryExecution.sparkPlan.collect { case j: LeftSemiJoinHash => j } assert(shj.size === 1, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 955f3f51cf..bb0a67dc03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -429,7 +429,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES |('serialization.last.column.takes.rest'='true') FROM src; """.stripMargin.replaceAll("\n", " ")) - + createQueryTest("LIKE", "SELECT * FROM src WHERE value LIKE '%1%'") @@ -567,7 +567,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(2, "str2") :: Nil) - testData.toDF.registerTempTable("REGisteredTABle") + testData.toDF().registerTempTable("REGisteredTABle") assertResult(Array(Row(2, "str2"))) { sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " + @@ -583,8 +583,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("SPARK-1704: Explain commands as a DataFrame") { sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - val rdd = sql("explain select key, count(value) from src group by key") - assert(isExplanation(rdd)) + val df = sql("explain select key, count(value) from src group by key") + assert(isExplanation(df)) TestHive.reset() } @@ -592,7 +592,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") { val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3)) .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)} - TestHive.sparkContext.parallelize(fixture).toDF.registerTempTable("having_test") + TestHive.sparkContext.parallelize(fixture).toDF().registerTempTable("having_test") val results = sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3") .collect() @@ -740,7 +740,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(1, "str2") :: Nil) - testData.toDF.registerTempTable("test_describe_commands2") + testData.toDF().registerTempTable("test_describe_commands2") assertResult( Array( @@ -900,8 +900,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { - sparkContext.makeRDD(Seq.empty[LogEntry]).toDF.registerTempTable("rawLogs") - sparkContext.makeRDD(Seq.empty[LogFile]).toDF.registerTempTable("logFiles") + sparkContext.makeRDD(Seq.empty[LogEntry]).toDF().registerTempTable("rawLogs") + sparkContext.makeRDD(Seq.empty[LogFile]).toDF().registerTempTable("logFiles") sql( """ @@ -979,8 +979,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { val testVal = "test.val.0" val nonexistentKey = "nonexistent" val KV = "([^=]+)=([^=]*)".r - def collectResults(rdd: DataFrame): Set[(String, String)] = - rdd.collect().map { + def collectResults(df: DataFrame): Set[(String, String)] = + df.collect().map { case Row(key: String, value: String) => key -> value case Row(KV(key, value)) => key -> value }.toSet diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 6fc4cc1426..f4440e5b78 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest { test("case insensitivity with scala reflection") { // Test resolution with Scala Reflection sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("caseSensitivityTest") + .toDF().registerTempTable("caseSensitivityTest") val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"), @@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest { ignore("case insensitivity with scala reflection joins") { // Test resolution with Scala Reflection sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("caseSensitivityTest") + .toDF().registerTempTable("caseSensitivityTest") sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect() } test("nested repeated resolution") { sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil) - .toDF.registerTempTable("nestedRepeatedTest") + .toDF().registerTempTable("nestedRepeatedTest") assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 245161d2eb..cb405f56bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest { | getStruct(1).f5 FROM src LIMIT 1 """.stripMargin).head() === Row(1, 2, 3, 4, 5)) } - + test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") { checkAnswer( sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"), @@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest { test("SPARK-2693 udaf aggregates test") { checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"), sql("SELECT max(key) FROM src").collect().toSeq) - + checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"), sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) } @@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest { test("Generic UDAF aggregates") { checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) - + checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"), sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq) } - + test("UDFIntegerToString") { val testData = TestHive.sparkContext.parallelize( - IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF + IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() testData.registerTempTable("integerTable") sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'") @@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: - ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF + ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() testData.registerTempTable("listListIntTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") @@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest { test("UDFListString") { val testData = TestHive.sparkContext.parallelize( ListStringCaseClass(Seq("a", "b", "c")) :: - ListStringCaseClass(Seq("d", "e")) :: Nil).toDF + ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() testData.registerTempTable("listStringTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") @@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest { test("UDFStringString") { val testData = TestHive.sparkContext.parallelize( - StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF + StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() testData.registerTempTable("stringTable") sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'") @@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest { ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: - Nil).toDF + Nil).toDF() testData.registerTempTable("TwoListTable") sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") |