aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-17 23:36:20 -0800
committerReynold Xin <rxin@databricks.com>2015-02-17 23:36:20 -0800
commit61ab08549cb6fceb6de1b5c490c55a89d4bd28fa (patch)
tree172480f72bb84b685b39acd1e744c84a5dc42d40 /sql
parent3912d332464dcd124c60b734724c34d9742466a4 (diff)
downloadspark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.gz
spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.tar.bz2
spark-61ab08549cb6fceb6de1b5c490c55a89d4bd28fa.zip
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4670 from liancheng/df-cleanup and squashes the following commits: 3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala48
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala53
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala40
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala18
23 files changed, 229 insertions, 238 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index fa5fe84263..5007a5a34d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -124,7 +124,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
* {{{
* val rdd: RDD[(Int, String)] = ...
- * rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2
+ * rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2
* rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
* @group basic
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 709b350144..db32fa80dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -887,8 +887,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
* only during the lifetime of this instance of SQLContext.
*/
- private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = {
- catalog.registerTable(Seq(tableName), rdd.logicalPlan)
+ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
+ catalog.registerTable(Seq(tableName), df.logicalPlan)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 052728c5d5..0fa2fe90f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -99,7 +99,7 @@ private[sql] trait ParquetTest {
* Writes `data` to a Parquet file and reads it back as a [[DataFrame]],
* which is then passed to `f`. The Parquet file will be deleted after `f` returns.
*/
- protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
+ protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
withParquetFile(data)(path => f(sqlContext.parquetFile(path)))
@@ -120,8 +120,8 @@ private[sql] trait ParquetTest {
protected def withParquetTable[T <: Product: ClassTag: TypeTag]
(data: Seq[T], tableName: String)
(f: => Unit): Unit = {
- withParquetRDD(data) { rdd =>
- sqlContext.registerDataFrameAsTable(rdd, tableName)
+ withParquetDataFrame(data) { df =>
+ sqlContext.registerDataFrameAsTable(df, tableName)
withTempTable(tableName)(f)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 691dae0a05..e70e866fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,17 +18,15 @@
package org.apache.spark.sql
import scala.concurrent.duration._
-import scala.language.implicitConversions
-import scala.language.postfixOps
+import scala.language.{implicitConversions, postfixOps}
import org.scalatest.concurrent.Eventually._
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.columnar._
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
-import org.apache.spark.storage.{StorageLevel, RDDBlockId}
+import org.apache.spark.storage.{RDDBlockId, StorageLevel}
case class BigData(s: String)
@@ -59,15 +57,15 @@ class CachedTableSuite extends QueryTest {
test("unpersist an uncached table will not raise exception") {
assert(None == cacheManager.lookupCachedData(testData))
- testData.unpersist(true)
+ testData.unpersist(blocking = true)
assert(None == cacheManager.lookupCachedData(testData))
- testData.unpersist(false)
+ testData.unpersist(blocking = false)
assert(None == cacheManager.lookupCachedData(testData))
testData.persist()
assert(None != cacheManager.lookupCachedData(testData))
- testData.unpersist(true)
+ testData.unpersist(blocking = true)
assert(None == cacheManager.lookupCachedData(testData))
- testData.unpersist(false)
+ testData.unpersist(blocking = false)
assert(None == cacheManager.lookupCachedData(testData))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 0da619def1..f31bc38922 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import org.apache.spark.sql.TestData._
-
import scala.language.postfixOps
import org.apache.spark.sql.functions._
@@ -251,20 +249,20 @@ class DataFrameSuite extends QueryTest {
Seq(Row(3,1), Row(3,2), Row(2,1), Row(2,2), Row(1,1), Row(1,2)))
checkAnswer(
- arrayData.toDF.orderBy('data.getItem(0).asc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)
+ arrayData.toDF().orderBy('data.getItem(0).asc),
+ arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)
checkAnswer(
- arrayData.toDF.orderBy('data.getItem(0).desc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)
+ arrayData.toDF().orderBy('data.getItem(0).desc),
+ arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)
checkAnswer(
- arrayData.toDF.orderBy('data.getItem(1).asc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)
+ arrayData.toDF().orderBy('data.getItem(1).asc),
+ arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)
checkAnswer(
- arrayData.toDF.orderBy('data.getItem(1).desc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
+ arrayData.toDF().orderBy('data.getItem(1).desc),
+ arrayData.toDF().collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
}
test("limit") {
@@ -273,11 +271,11 @@ class DataFrameSuite extends QueryTest {
testData.take(10).toSeq)
checkAnswer(
- arrayData.toDF.limit(1),
+ arrayData.toDF().limit(1),
arrayData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq)))
checkAnswer(
- mapData.toDF.limit(1),
+ mapData.toDF().limit(1),
mapData.take(1).map(r => Row.fromSeq(r.productIterator.toSeq)))
}
@@ -411,7 +409,7 @@ class DataFrameSuite extends QueryTest {
}
test("addColumn") {
- val df = testData.toDF.withColumn("newCol", col("key") + 1)
+ val df = testData.toDF().withColumn("newCol", col("key") + 1)
checkAnswer(
df,
testData.collect().map { case Row(key: Int, value: String) =>
@@ -421,7 +419,7 @@ class DataFrameSuite extends QueryTest {
}
test("renameColumn") {
- val df = testData.toDF.withColumn("newCol", col("key") + 1)
+ val df = testData.toDF().withColumn("newCol", col("key") + 1)
.withColumnRenamed("value", "valueRenamed")
checkAnswer(
df,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index fd73065c4a..dd0948ad82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -40,8 +40,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
def assertJoin(sqlString: String, c: Class[_]): Any = {
- val rdd = sql(sqlString)
- val physical = rdd.queryExecution.sparkPlan
+ val df = sql(sqlString)
+ val physical = df.queryExecution.sparkPlan
val operators = physical.collect {
case j: ShuffledHashJoin => j
case j: HashOuterJoin => j
@@ -410,8 +410,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
test("left semi join") {
- val rdd = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a")
- checkAnswer(rdd,
+ val df = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a")
+ checkAnswer(df,
Row(1, 1) ::
Row(1, 2) ::
Row(2, 1) ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index dfb6858957..9b4dd6c620 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -35,36 +35,36 @@ class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
* none of keywords are listed in the answer
- * @param rdd the [[DataFrame]] to be executed
+ * @param df the [[DataFrame]] to be executed
* @param exists true for make sure the keywords are listed in the output, otherwise
* to make sure none of the keyword are not listed in the output
* @param keywords keyword in string array
*/
- def checkExistence(rdd: DataFrame, exists: Boolean, keywords: String*) {
- val outputs = rdd.collect().map(_.mkString).mkString
+ def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
+ val outputs = df.collect().map(_.mkString).mkString
for (key <- keywords) {
if (exists) {
- assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)")
+ assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)")
} else {
- assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)")
+ assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)")
}
}
}
/**
* Runs the plan and makes sure the answer matches the expected result.
- * @param rdd the [[DataFrame]] to be executed
+ * @param df the [[DataFrame]] to be executed
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
- protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = {
- QueryTest.checkAnswer(rdd, expectedAnswer) match {
+ protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+ QueryTest.checkAnswer(df, expectedAnswer) match {
case Some(errorMessage) => fail(errorMessage)
case None =>
}
}
- protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
- checkAnswer(rdd, Seq(expectedAnswer))
+ protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
+ checkAnswer(df, Seq(expectedAnswer))
}
def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
@@ -95,11 +95,11 @@ object QueryTest {
* If there was exception during the execution or the contents of the DataFrame does not
* match the expected result, an error message will be returned. Otherwise, a [[None]] will
* be returned.
- * @param rdd the [[DataFrame]] to be executed
+ * @param df the [[DataFrame]] to be executed
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
- def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
- val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+ def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
+ val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
// For BigDecimal type, the Scala type has a better definition of equality test (similar to
@@ -110,14 +110,14 @@ object QueryTest {
case o => o
})
}
- if (!isSorted) converted.sortBy(_.toString) else converted
+ if (!isSorted) converted.sortBy(_.toString()) else converted
}
- val sparkAnswer = try rdd.collect().toSeq catch {
+ val sparkAnswer = try df.collect().toSeq catch {
case e: Exception =>
val errorMessage =
s"""
|Exception thrown while executing query:
- |${rdd.queryExecution}
+ |${df.queryExecution}
|== Exception ==
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
@@ -129,17 +129,17 @@ object QueryTest {
val errorMessage =
s"""
|Results do not match for query:
- |${rdd.logicalPlan}
+ |${df.logicalPlan}
|== Analyzed Plan ==
- |${rdd.queryExecution.analyzed}
+ |${df.queryExecution.analyzed}
|== Physical Plan ==
- |${rdd.queryExecution.executedPlan}
+ |${df.queryExecution.executedPlan}
|== Results ==
|${sideBySide(
s"== Correct Answer - ${expectedAnswer.size} ==" +:
- prepareAnswer(expectedAnswer).map(_.toString),
+ prepareAnswer(expectedAnswer).map(_.toString()),
s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
""".stripMargin
return Some(errorMessage)
}
@@ -147,8 +147,8 @@ object QueryTest {
return None
}
- def checkAnswer(rdd: DataFrame, expectedAnswer: java.util.List[Row]): String = {
- checkAnswer(rdd, expectedAnswer.toSeq) match {
+ def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = {
+ checkAnswer(df, expectedAnswer.toSeq) match {
case Some(errorMessage) => errorMessage
case None => null
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 97684f75e7..097bf0dd23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1034,10 +1034,10 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("Supporting relational operator '<=>' in Spark SQL") {
val nullCheckData1 = TestData(1,"1") :: TestData(2,null) :: Nil
val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i)))
- rdd1.toDF.registerTempTable("nulldata1")
+ rdd1.toDF().registerTempTable("nulldata1")
val nullCheckData2 = TestData(1,"1") :: TestData(2,null) :: Nil
val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i)))
- rdd2.toDF.registerTempTable("nulldata2")
+ rdd2.toDF().registerTempTable("nulldata2")
checkAnswer(sql("SELECT nulldata1.key FROM nulldata1 join " +
"nulldata2 on nulldata1.value <=> nulldata2.value"),
(1 to 2).map(i => Row(i)))
@@ -1046,7 +1046,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("Multi-column COUNT(DISTINCT ...)") {
val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
- rdd.toDF.registerTempTable("distinctData")
+ rdd.toDF().registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index 9a48f8d063..23df6e7eac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -82,7 +82,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
new java.math.BigDecimal(1), new Date(12345), new Timestamp(12345), Seq(1,2,3))
val rdd = sparkContext.parallelize(data :: Nil)
- rdd.toDF.registerTempTable("reflectData")
+ rdd.toDF().registerTempTable("reflectData")
assert(sql("SELECT * FROM reflectData").collect().head ===
Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
@@ -93,7 +93,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
test("query case class RDD with nulls") {
val data = NullReflectData(null, null, null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)
- rdd.toDF.registerTempTable("reflectNullData")
+ rdd.toDF().registerTempTable("reflectNullData")
assert(sql("SELECT * FROM reflectNullData").collect().head === Row.fromSeq(Seq.fill(7)(null)))
}
@@ -101,7 +101,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
test("query case class RDD with Nones") {
val data = OptionalReflectData(None, None, None, None, None, None, None)
val rdd = sparkContext.parallelize(data :: Nil)
- rdd.toDF.registerTempTable("reflectOptionalData")
+ rdd.toDF().registerTempTable("reflectOptionalData")
assert(sql("SELECT * FROM reflectOptionalData").collect().head === Row.fromSeq(Seq.fill(7)(null)))
}
@@ -109,7 +109,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
// Equality is broken for Arrays, so we test that separately.
test("query binary data") {
val rdd = sparkContext.parallelize(ReflectBinary(Array[Byte](1)) :: Nil)
- rdd.toDF.registerTempTable("reflectBinary")
+ rdd.toDF().registerTempTable("reflectBinary")
val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
assert(result.toSeq === Seq[Byte](1))
@@ -128,7 +128,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
Map(10 -> Some(100L), 20 -> Some(200L), 30 -> None),
Nested(None, "abc")))
val rdd = sparkContext.parallelize(data :: Nil)
- rdd.toDF.registerTempTable("reflectComplexData")
+ rdd.toDF().registerTempTable("reflectComplexData")
assert(sql("SELECT * FROM reflectComplexData").collect().head ===
new GenericRow(Array[Any](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index c511eb1469..637f59b2e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -29,11 +29,11 @@ case class TestData(key: Int, value: String)
object TestData {
val testData = TestSQLContext.sparkContext.parallelize(
- (1 to 100).map(i => TestData(i, i.toString))).toDF
+ (1 to 100).map(i => TestData(i, i.toString))).toDF()
testData.registerTempTable("testData")
val negativeData = TestSQLContext.sparkContext.parallelize(
- (1 to 100).map(i => TestData(-i, (-i).toString))).toDF
+ (1 to 100).map(i => TestData(-i, (-i).toString))).toDF()
negativeData.registerTempTable("negativeData")
case class LargeAndSmallInts(a: Int, b: Int)
@@ -44,7 +44,7 @@ object TestData {
LargeAndSmallInts(2147483645, 1) ::
LargeAndSmallInts(2, 2) ::
LargeAndSmallInts(2147483646, 1) ::
- LargeAndSmallInts(3, 2) :: Nil).toDF
+ LargeAndSmallInts(3, 2) :: Nil).toDF()
largeAndSmallInts.registerTempTable("largeAndSmallInts")
case class TestData2(a: Int, b: Int)
@@ -55,7 +55,7 @@ object TestData {
TestData2(2, 1) ::
TestData2(2, 2) ::
TestData2(3, 1) ::
- TestData2(3, 2) :: Nil, 2).toDF
+ TestData2(3, 2) :: Nil, 2).toDF()
testData2.registerTempTable("testData2")
case class DecimalData(a: BigDecimal, b: BigDecimal)
@@ -67,7 +67,7 @@ object TestData {
DecimalData(2, 1) ::
DecimalData(2, 2) ::
DecimalData(3, 1) ::
- DecimalData(3, 2) :: Nil).toDF
+ DecimalData(3, 2) :: Nil).toDF()
decimalData.registerTempTable("decimalData")
case class BinaryData(a: Array[Byte], b: Int)
@@ -77,14 +77,14 @@ object TestData {
BinaryData("22".getBytes(), 5) ::
BinaryData("122".getBytes(), 3) ::
BinaryData("121".getBytes(), 2) ::
- BinaryData("123".getBytes(), 4) :: Nil).toDF
+ BinaryData("123".getBytes(), 4) :: Nil).toDF()
binaryData.registerTempTable("binaryData")
case class TestData3(a: Int, b: Option[Int])
val testData3 =
TestSQLContext.sparkContext.parallelize(
TestData3(1, None) ::
- TestData3(2, Some(2)) :: Nil).toDF
+ TestData3(2, Some(2)) :: Nil).toDF()
testData3.registerTempTable("testData3")
val emptyTableData = logical.LocalRelation($"a".int, $"b".int)
@@ -97,7 +97,7 @@ object TestData {
UpperCaseData(3, "C") ::
UpperCaseData(4, "D") ::
UpperCaseData(5, "E") ::
- UpperCaseData(6, "F") :: Nil).toDF
+ UpperCaseData(6, "F") :: Nil).toDF()
upperCaseData.registerTempTable("upperCaseData")
case class LowerCaseData(n: Int, l: String)
@@ -106,7 +106,7 @@ object TestData {
LowerCaseData(1, "a") ::
LowerCaseData(2, "b") ::
LowerCaseData(3, "c") ::
- LowerCaseData(4, "d") :: Nil).toDF
+ LowerCaseData(4, "d") :: Nil).toDF()
lowerCaseData.registerTempTable("lowerCaseData")
case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]])
@@ -114,7 +114,7 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
- arrayData.toDF.registerTempTable("arrayData")
+ arrayData.toDF().registerTempTable("arrayData")
case class MapData(data: scala.collection.Map[Int, String])
val mapData =
@@ -124,18 +124,18 @@ object TestData {
MapData(Map(1 -> "a3", 2 -> "b3", 3 -> "c3")) ::
MapData(Map(1 -> "a4", 2 -> "b4")) ::
MapData(Map(1 -> "a5")) :: Nil)
- mapData.toDF.registerTempTable("mapData")
+ mapData.toDF().registerTempTable("mapData")
case class StringData(s: String)
val repeatedData =
TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
- repeatedData.toDF.registerTempTable("repeatedData")
+ repeatedData.toDF().registerTempTable("repeatedData")
val nullableRepeatedData =
TestSQLContext.sparkContext.parallelize(
List.fill(2)(StringData(null)) ++
List.fill(2)(StringData("test")))
- nullableRepeatedData.toDF.registerTempTable("nullableRepeatedData")
+ nullableRepeatedData.toDF().registerTempTable("nullableRepeatedData")
case class NullInts(a: Integer)
val nullInts =
@@ -144,7 +144,7 @@ object TestData {
NullInts(2) ::
NullInts(3) ::
NullInts(null) :: Nil
- ).toDF
+ ).toDF()
nullInts.registerTempTable("nullInts")
val allNulls =
@@ -152,7 +152,7 @@ object TestData {
NullInts(null) ::
NullInts(null) ::
NullInts(null) ::
- NullInts(null) :: Nil).toDF
+ NullInts(null) :: Nil).toDF()
allNulls.registerTempTable("allNulls")
case class NullStrings(n: Int, s: String)
@@ -160,11 +160,15 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
NullStrings(1, "abc") ::
NullStrings(2, "ABC") ::
- NullStrings(3, null) :: Nil).toDF
+ NullStrings(3, null) :: Nil).toDF()
nullStrings.registerTempTable("nullStrings")
case class TableName(tableName: String)
- TestSQLContext.sparkContext.parallelize(TableName("test") :: Nil).toDF.registerTempTable("tableName")
+ TestSQLContext
+ .sparkContext
+ .parallelize(TableName("test") :: Nil)
+ .toDF()
+ .registerTempTable("tableName")
val unparsedStrings =
TestSQLContext.sparkContext.parallelize(
@@ -177,22 +181,22 @@ object TestData {
val timestamps = TestSQLContext.sparkContext.parallelize((1 to 3).map { i =>
TimestampField(new Timestamp(i))
})
- timestamps.toDF.registerTempTable("timestamps")
+ timestamps.toDF().registerTempTable("timestamps")
case class IntField(i: Int)
// An RDD with 4 elements and 8 partitions
val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8)
- withEmptyParts.toDF.registerTempTable("withEmptyParts")
+ withEmptyParts.toDF().registerTempTable("withEmptyParts")
case class Person(id: Int, name: String, age: Int)
case class Salary(personId: Int, salary: Double)
val person = TestSQLContext.sparkContext.parallelize(
Person(0, "mike", 30) ::
- Person(1, "jim", 20) :: Nil).toDF
+ Person(1, "jim", 20) :: Nil).toDF()
person.registerTempTable("person")
val salary = TestSQLContext.sparkContext.parallelize(
Salary(0, 2000.0) ::
- Salary(1, 1000.0) :: Nil).toDF
+ Salary(1, 1000.0) :: Nil).toDF()
salary.registerTempTable("salary")
case class ComplexData(m: Map[Int, String], s: TestData, a: Seq[Int], b: Boolean)
@@ -200,6 +204,6 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true)
:: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false)
- :: Nil).toDF
+ :: Nil).toDF()
complexData.registerTempTable("complexData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ad2fbc3f04..ee5c7620d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -17,13 +17,13 @@
package org.apache.spark.sql.jdbc
-import java.math.BigDecimal
+import java.sql.DriverManager
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
import org.apache.spark.sql.Row
-import org.apache.spark.sql.types._
import org.apache.spark.sql.test._
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import java.sql.DriverManager
-import TestSQLContext._
+import org.apache.spark.sql.types._
class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
val url = "jdbc:h2:mem:testdb2"
@@ -54,53 +54,53 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
StructField("seq", IntegerType) :: Nil)
test("Basic CREATE") {
- val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
+ val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
- srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false)
+ df.createJDBCTable(url, "TEST.BASICCREATETEST", false)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
}
test("CREATE with overwrite") {
- val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
- val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
+ val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
+ val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- srdd.createJDBCTable(url, "TEST.DROPTEST", false)
+ df.createJDBCTable(url, "TEST.DROPTEST", false)
assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
- srdd2.createJDBCTable(url, "TEST.DROPTEST", true)
+ df2.createJDBCTable(url, "TEST.DROPTEST", true)
assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
}
test("CREATE then INSERT to append") {
- val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
- val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
+ val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
+ val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- srdd.createJDBCTable(url, "TEST.APPENDTEST", false)
- srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
+ df.createJDBCTable(url, "TEST.APPENDTEST", false)
+ df2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
}
test("CREATE then INSERT to truncate") {
- val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
- val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
+ val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
+ val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
- srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false)
- srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true)
+ df.createJDBCTable(url, "TEST.TRUNCATETEST", false)
+ df2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true)
assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length)
}
test("Incompatible INSERT to append") {
- val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
- val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
+ val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
+ val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
- srdd.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false)
+ df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false)
intercept[org.apache.spark.SparkException] {
- srdd2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
+ df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
index 4f38110c80..5b8a76f461 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
@@ -18,18 +18,13 @@
package org.apache.spark.sql.jdbc
import java.math.BigDecimal
-import java.sql.{Date, DriverManager, Timestamp}
-import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
+import java.sql.{Date, Timestamp}
+
+import com.spotify.docker.client.DockerClient
import com.spotify.docker.client.messages.ContainerConfig
-import org.scalatest.{FunSuite, BeforeAndAfterAll, Ignore}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.SparkContext._
-import org.apache.spark.sql._
import org.apache.spark.sql.test._
-import TestSQLContext._
-
-import org.apache.spark.sql.jdbc._
class MySQLDatabase {
val docker: DockerClient = DockerClientFactory.get()
@@ -37,9 +32,9 @@ class MySQLDatabase {
println("Pulling mysql")
docker.pull("mysql")
println("Configuring container")
- val config = (ContainerConfig.builder().image("mysql")
- .env("MYSQL_ROOT_PASSWORD=rootpass")
- .build())
+ val config = ContainerConfig.builder().image("mysql")
+ .env("MYSQL_ROOT_PASSWORD=rootpass")
+ .build()
println("Creating container")
val id = docker.createContainer(config).id
println("Starting container " + id)
@@ -57,11 +52,10 @@ class MySQLDatabase {
println("Closing docker client")
DockerClientFactory.close(docker)
} catch {
- case e: Exception => {
+ case e: Exception =>
println(e)
println("You may need to clean this up manually.")
throw e
- }
}
}
}
@@ -86,10 +80,9 @@ class MySQLDatabase {
println("Database is up.")
return;
} catch {
- case e: java.sql.SQLException => {
+ case e: java.sql.SQLException =>
lastException = e
java.lang.Thread.sleep(250)
- }
}
}
}
@@ -143,8 +136,8 @@ class MySQLDatabase {
}
test("Basic test") {
- val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
- val rows = rdd.collect
+ val df = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
+ val rows = df.collect()
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 2)
@@ -153,8 +146,8 @@ class MySQLDatabase {
}
test("Numeric types") {
- val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
- val rows = rdd.collect
+ val df = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
+ val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
@@ -181,8 +174,8 @@ class MySQLDatabase {
}
test("Date types") {
- val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates")
- val rows = rdd.collect
+ val df = TestSQLContext.jdbc(url(ip, "foo"), "dates")
+ val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 5)
@@ -199,8 +192,8 @@ class MySQLDatabase {
}
test("String types") {
- val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings")
- val rows = rdd.collect
+ val df = TestSQLContext.jdbc(url(ip, "foo"), "strings")
+ val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
@@ -225,11 +218,11 @@ class MySQLDatabase {
}
test("Basic write test") {
- val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
- val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
- val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
- rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
- rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false)
- rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false)
+ val df1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
+ val df2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
+ val df3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
+ df1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
+ df2.createJDBCTable(url(ip, "foo"), "datescopy", false)
+ df3.createJDBCTable(url(ip, "foo"), "stringscopy", false)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
index 7b47feeb78..e17be99ac3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
@@ -17,13 +17,13 @@
package org.apache.spark.sql.jdbc
-import java.math.BigDecimal
-import org.apache.spark.sql.test._
-import org.scalatest.{FunSuite, BeforeAndAfterAll, Ignore}
import java.sql.DriverManager
-import TestSQLContext._
-import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
+
+import com.spotify.docker.client.DockerClient
import com.spotify.docker.client.messages.ContainerConfig
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Ignore}
+
+import org.apache.spark.sql.test._
class PostgresDatabase {
val docker: DockerClient = DockerClientFactory.get()
@@ -31,9 +31,9 @@ class PostgresDatabase {
println("Pulling postgres")
docker.pull("postgres")
println("Configuring container")
- val config = (ContainerConfig.builder().image("postgres")
- .env("POSTGRES_PASSWORD=rootpass")
- .build())
+ val config = ContainerConfig.builder().image("postgres")
+ .env("POSTGRES_PASSWORD=rootpass")
+ .build()
println("Creating container")
val id = docker.createContainer(config).id
println("Starting container " + id)
@@ -51,11 +51,10 @@ class PostgresDatabase {
println("Closing docker client")
DockerClientFactory.close(docker)
} catch {
- case e: Exception => {
+ case e: Exception =>
println(e)
println("You may need to clean this up manually.")
throw e
- }
}
}
}
@@ -79,10 +78,9 @@ class PostgresDatabase {
println("Database is up.")
return;
} catch {
- case e: java.sql.SQLException => {
+ case e: java.sql.SQLException =>
lastException = e
java.lang.Thread.sleep(250)
- }
}
}
}
@@ -113,8 +111,8 @@ class PostgresDatabase {
}
test("Type mapping for various types") {
- val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
- val rows = rdd.collect
+ val df = TestSQLContext.jdbc(url(db.ip), "public.bar")
+ val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 10)
@@ -142,8 +140,8 @@ class PostgresDatabase {
}
test("Basic write test") {
- val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
- rdd.createJDBCTable(url(db.ip), "public.barcopy", false)
+ val df = TestSQLContext.jdbc(url(db.ip), "public.bar")
+ df.createJDBCTable(url(db.ip), "public.barcopy", false)
// Test only that it doesn't bomb out.
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index eb2d5f2529..4d32e84fc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -45,7 +45,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
private def checkFilterPredicate(
- rdd: DataFrame,
+ df: DataFrame,
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
checker: (DataFrame, Seq[Row]) => Unit,
@@ -53,7 +53,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
val output = predicate.collect { case a: Attribute => a }.distinct
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
- val query = rdd
+ val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
@@ -85,36 +85,36 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
private def checkFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
- (implicit rdd: DataFrame): Unit = {
- checkFilterPredicate(rdd, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected)
+ (implicit df: DataFrame): Unit = {
+ checkFilterPredicate(df, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected)
}
private def checkFilterPredicate[T]
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T)
- (implicit rdd: DataFrame): Unit = {
- checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
+ (implicit df: DataFrame): Unit = {
+ checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}
private def checkBinaryFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row])
- (implicit rdd: DataFrame): Unit = {
- def checkBinaryAnswer(rdd: DataFrame, expected: Seq[Row]) = {
+ (implicit df: DataFrame): Unit = {
+ def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
- rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+ df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
}
}
- checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, expected)
+ checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected)
}
private def checkBinaryFilterPredicate
(predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte])
- (implicit rdd: DataFrame): Unit = {
- checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
+ (implicit df: DataFrame): Unit = {
+ checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}
test("filter pushdown - boolean") {
- withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit rdd =>
+ withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
@@ -124,7 +124,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - short") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df =>
checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
checkFilterPredicate(
Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
@@ -151,7 +151,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - integer") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
@@ -176,7 +176,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - long") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
@@ -201,7 +201,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - float") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
@@ -226,7 +226,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - double") {
- withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
@@ -251,7 +251,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}
test("filter pushdown - string") {
- withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
'_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
@@ -282,7 +282,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}
- withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 208f35761b..36f3406a78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -73,7 +73,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
* Writes `data` to a Parquet file, reads it back and check file contents.
*/
protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = {
- withParquetRDD(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
+ withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
}
test("basic data types (without binary)") {
@@ -85,9 +85,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("raw binary") {
val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
- withParquetRDD(data) { rdd =>
+ withParquetDataFrame(data) { df =>
assertResult(data.map(_._1.mkString(",")).sorted) {
- rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+ df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
}
}
}
@@ -106,7 +106,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
sparkContext
.parallelize(0 to 1000)
.map(i => Tuple1(i / 100.0))
- .toDF
+ .toDF()
// Parquet doesn't allow column names with spaces, have to add an alias here
.select($"_1" cast decimal as "dec")
@@ -147,9 +147,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("struct") {
val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
- withParquetRDD(data) { rdd =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
- checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ checkAnswer(df, data.map { case Tuple1(struct) =>
Row(Row(struct.productIterator.toSeq: _*))
})
}
@@ -157,9 +157,9 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
- withParquetRDD(data) { rdd =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
- checkAnswer(rdd, data.map { case Tuple1(struct) =>
+ checkAnswer(df, data.map { case Tuple1(struct) =>
Row(Row(struct.productIterator.toSeq: _*))
})
}
@@ -167,8 +167,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
test("nested map with struct as value type") {
val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
- withParquetRDD(data) { rdd =>
- checkAnswer(rdd, data.map { case Tuple1(m) =>
+ withParquetDataFrame(data) { df =>
+ checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
})
}
@@ -182,8 +182,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
null.asInstanceOf[java.lang.Float],
null.asInstanceOf[java.lang.Double])
- withParquetRDD(allNulls :: Nil) { rdd =>
- val rows = rdd.collect()
+ withParquetDataFrame(allNulls :: Nil) { df =>
+ val rows = df.collect()
assert(rows.size === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
}
@@ -195,8 +195,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
None.asInstanceOf[Option[Long]],
None.asInstanceOf[Option[String]])
- withParquetRDD(allNones :: Nil) { rdd =>
- val rows = rdd.collect()
+ withParquetDataFrame(allNones :: Nil) { df =>
+ val rows = df.collect()
assert(rows.size === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 8b4d05ec54..b98ba09ccf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -68,7 +68,7 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
val queryOutput = selfJoin.queryExecution.analyzed.output
- assertResult(4, "Field count mismatche")(queryOutput.size)
+ assertResult(4, "Field count mismatches")(queryOutput.size)
assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") {
queryOutput.filter(_.name == "_1").map(_.exprId).size
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 2e6c2d5f9a..ad880e2bc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -36,8 +36,8 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
private def testSchema[T <: Product: ClassTag: TypeTag](
testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
test(testName) {
- val actual = ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T],
- isThriftDerived)
+ val actual = ParquetTypesConverter.convertFromAttributes(
+ ScalaReflection.attributesFor[T], isThriftDerived)
val expected = MessageTypeParser.parseMessageType(messageType)
actual.checkContains(expected)
expected.checkContains(actual)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 9fcb04ca23..d4b175fa44 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -37,7 +37,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._
val testData = TestHive.sparkContext.parallelize(
- (1 to 100).map(i => TestData(i, i.toString))).toDF
+ (1 to 100).map(i => TestData(i, i.toString))).toDF()
before {
// Since every we are doing tests for DDL statements,
@@ -65,7 +65,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
+ testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
)
// Now overwrite.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e5156ae821..0bd82773f3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
test("check change without refresh") {
val tempDir = File.createTempFile("sparksql", "json")
tempDir.delete()
- sparkContext.parallelize(("a", "b") :: Nil).toDF
+ sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql(
@@ -171,7 +171,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
- sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF
+ sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
// Schema is cached so the new column does not show. The updated values in existing columns
@@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
test("drop, change, recreate") {
val tempDir = File.createTempFile("sparksql", "json")
tempDir.delete()
- sparkContext.parallelize(("a", "b") :: Nil).toDF
+ sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql(
@@ -209,7 +209,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
FileUtils.deleteDirectory(tempDir)
- sparkContext.parallelize(("a", "b", "c") :: Nil).toDF
+ sparkContext.parallelize(("a", "b", "c") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql("DROP TABLE jsonTable")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6f07fd5a87..1e05a024b8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -127,11 +127,11 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
}
test("estimates the size of a test MetastoreRelation") {
- val rdd = sql("""SELECT * FROM src""")
- val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
+ val df = sql("""SELECT * FROM src""")
+ val sizes = df.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.statistics.sizeInBytes
}
- assert(sizes.size === 1, s"Size wrong for:\n ${rdd.queryExecution}")
+ assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(5812)),
s"expected exact size 5812 for test table 'src', got: ${sizes(0)}")
}
@@ -145,10 +145,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
ct: ClassTag[_]) = {
before()
- var rdd = sql(query)
+ var df = sql(query)
// Assert src has a size smaller than the threshold.
- val sizes = rdd.queryExecution.analyzed.collect {
+ val sizes = df.queryExecution.analyzed.collect {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold
@@ -157,21 +157,21 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.size === 1,
- s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
+ s"actual query plans do not contain broadcast join: ${df.queryExecution}")
- checkAnswer(rdd, expectedAnswer) // check correctness of output
+ checkAnswer(df, expectedAnswer) // check correctness of output
TestHive.conf.settings.synchronized {
val tmp = conf.autoBroadcastJoinThreshold
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
- rdd = sql(query)
- bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
+ df = sql(query)
+ bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = rdd.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
+ val shj = df.queryExecution.sparkPlan.collect { case j: ShuffledHashJoin => j }
assert(shj.size === 1,
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
@@ -199,10 +199,10 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
|left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin
val answer = Row(86, "val_86")
- var rdd = sql(leftSemiJoinQuery)
+ var df = sql(leftSemiJoinQuery)
// Assert src has a size smaller than the threshold.
- val sizes = rdd.queryExecution.analyzed.collect {
+ val sizes = df.queryExecution.analyzed.collect {
case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass
.isAssignableFrom(r.getClass) =>
r.statistics.sizeInBytes
@@ -213,25 +213,25 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
- var bhj = rdd.queryExecution.sparkPlan.collect {
+ var bhj = df.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.size === 1,
- s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")
+ s"actual query plans do not contain broadcast join: ${df.queryExecution}")
- checkAnswer(rdd, answer) // check correctness of output
+ checkAnswer(df, answer) // check correctness of output
TestHive.conf.settings.synchronized {
val tmp = conf.autoBroadcastJoinThreshold
sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
- rdd = sql(leftSemiJoinQuery)
- bhj = rdd.queryExecution.sparkPlan.collect {
+ df = sql(leftSemiJoinQuery)
+ bhj = df.queryExecution.sparkPlan.collect {
case j: BroadcastLeftSemiJoinHash => j
}
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
- val shj = rdd.queryExecution.sparkPlan.collect {
+ val shj = df.queryExecution.sparkPlan.collect {
case j: LeftSemiJoinHash => j
}
assert(shj.size === 1,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 955f3f51cf..bb0a67dc03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -429,7 +429,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES
|('serialization.last.column.takes.rest'='true') FROM src;
""".stripMargin.replaceAll("\n", " "))
-
+
createQueryTest("LIKE",
"SELECT * FROM src WHERE value LIKE '%1%'")
@@ -567,7 +567,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(2, "str2") :: Nil)
- testData.toDF.registerTempTable("REGisteredTABle")
+ testData.toDF().registerTempTable("REGisteredTABle")
assertResult(Array(Row(2, "str2"))) {
sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " +
@@ -583,8 +583,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-1704: Explain commands as a DataFrame") {
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- val rdd = sql("explain select key, count(value) from src group by key")
- assert(isExplanation(rdd))
+ val df = sql("explain select key, count(value) from src group by key")
+ assert(isExplanation(df))
TestHive.reset()
}
@@ -592,7 +592,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") {
val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3))
.zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)}
- TestHive.sparkContext.parallelize(fixture).toDF.registerTempTable("having_test")
+ TestHive.sparkContext.parallelize(fixture).toDF().registerTempTable("having_test")
val results =
sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3")
.collect()
@@ -740,7 +740,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(1, "str2") :: Nil)
- testData.toDF.registerTempTable("test_describe_commands2")
+ testData.toDF().registerTempTable("test_describe_commands2")
assertResult(
Array(
@@ -900,8 +900,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
- sparkContext.makeRDD(Seq.empty[LogEntry]).toDF.registerTempTable("rawLogs")
- sparkContext.makeRDD(Seq.empty[LogFile]).toDF.registerTempTable("logFiles")
+ sparkContext.makeRDD(Seq.empty[LogEntry]).toDF().registerTempTable("rawLogs")
+ sparkContext.makeRDD(Seq.empty[LogFile]).toDF().registerTempTable("logFiles")
sql(
"""
@@ -979,8 +979,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
val KV = "([^=]+)=([^=]*)".r
- def collectResults(rdd: DataFrame): Set[(String, String)] =
- rdd.collect().map {
+ def collectResults(df: DataFrame): Set[(String, String)] =
+ df.collect().map {
case Row(key: String, value: String) => key -> value
case Row(KV(key, value)) => key -> value
}.toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 6fc4cc1426..f4440e5b78 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest {
test("case insensitivity with scala reflection") {
// Test resolution with Scala Reflection
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("caseSensitivityTest")
+ .toDF().registerTempTable("caseSensitivityTest")
val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"),
@@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
ignore("case insensitivity with scala reflection joins") {
// Test resolution with Scala Reflection
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("caseSensitivityTest")
+ .toDF().registerTempTable("caseSensitivityTest")
sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect()
}
test("nested repeated resolution") {
sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
- .toDF.registerTempTable("nestedRepeatedTest")
+ .toDF().registerTempTable("nestedRepeatedTest")
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 245161d2eb..cb405f56bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest {
| getStruct(1).f5 FROM src LIMIT 1
""".stripMargin).head() === Row(1, 2, 3, 4, 5))
}
-
+
test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") {
checkAnswer(
sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"),
@@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest {
test("SPARK-2693 udaf aggregates test") {
checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src").collect().toSeq)
-
+
checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"),
sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq)
}
@@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest {
test("Generic UDAF aggregates") {
checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999)) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)
-
+
checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"),
sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq)
}
-
+
test("UDFIntegerToString") {
val testData = TestHive.sparkContext.parallelize(
- IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF
+ IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
testData.registerTempTable("integerTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'")
@@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest {
val testData = TestHive.sparkContext.parallelize(
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
- ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF
+ ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
testData.registerTempTable("listListIntTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
@@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest {
test("UDFListString") {
val testData = TestHive.sparkContext.parallelize(
ListStringCaseClass(Seq("a", "b", "c")) ::
- ListStringCaseClass(Seq("d", "e")) :: Nil).toDF
+ ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
testData.registerTempTable("listStringTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
@@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest {
test("UDFStringString") {
val testData = TestHive.sparkContext.parallelize(
- StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF
+ StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
testData.registerTempTable("stringTable")
sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'")
@@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest {
ListListIntCaseClass(Nil) ::
ListListIntCaseClass(Seq((1, 2, 3))) ::
ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
- Nil).toDF
+ Nil).toDF()
testData.registerTempTable("TwoListTable")
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")