aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-28 12:10:01 -0800
committerReynold Xin <rxin@databricks.com>2015-01-28 12:10:01 -0800
commitc8e934ef3cd06f02f9a2946e96a1a52293c22490 (patch)
tree4113fbfe9e22f50899b9c00e0267fa7fefd18c0c
parent453d7999b88be87bda30d9e73038eb484ee063bd (diff)
downloadspark-c8e934ef3cd06f02f9a2946e96a1a52293c22490.tar.gz
spark-c8e934ef3cd06f02f9a2946e96a1a52293c22490.tar.bz2
spark-c8e934ef3cd06f02f9a2946e96a1a52293c22490.zip
[SPARK-5447][SQL] Replaced reference to SchemaRDD with DataFrame.
and [SPARK-5448][SQL] Make CacheManager a concrete class and field in SQLContext Author: Reynold Xin <rxin@databricks.com> Closes #4242 from rxin/sqlCleanup and squashes the following commits: e351cb2 [Reynold Xin] Fixed toDataFrame. 6545c42 [Reynold Xin] More changes. 728c017 [Reynold Xin] [SPARK-5447][SQL] Replaced reference to SchemaRDD with DataFrame.
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala2
-rw-r--r--repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala6
-rw-r--r--sql/README.md2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala102
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala120
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
33 files changed, 217 insertions, 203 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
index f229a58985..ab58375649 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
@@ -81,7 +81,7 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")
// Convert input data to DataFrame explicitly.
- val df: DataFrame = origData.toDF
+ val df: DataFrame = origData.toDataFrame
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index f6437c7fbc..f0bea5f469 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -148,7 +148,7 @@ class ALSModel private[ml] (
}
private object ALSModel {
- /** Case class to convert factors to SchemaRDDs */
+ /** Case class to convert factors to [[DataFrame]]s */
private case class Factor(id: Int, features: Seq[Float])
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 2834ea75ce..31c33f1bf6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -110,7 +110,7 @@ sealed trait Vector extends Serializable {
/**
* User-defined type for [[Vector]] which allows easy interaction with SQL
- * via [[org.apache.spark.sql.SchemaRDD]].
+ * via [[org.apache.spark.sql.DataFrame]].
*/
private[spark] class VectorUDT extends UserDefinedType[Vector] {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 1912afce93..33e40dc741 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -31,7 +31,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext {
override def beforeAll(): Unit = {
super.beforeAll()
sqlContext = new SQLContext(sc)
- dataset = sqlContext.createSchemaRDD(
+ dataset = sqlContext.createDataFrame(
sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 58289acdbc..9da253c61d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -350,7 +350,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging {
numItemBlocks: Int = 3,
targetRMSE: Double = 0.05): Unit = {
val sqlContext = this.sqlContext
- import sqlContext.createSchemaRDD
+ import sqlContext.createDataFrame
val als = new ALS()
.setRank(rank)
.setRegParam(regParam)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
index 74104fa7a6..761ea821ef 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
@@ -32,7 +32,7 @@ class CrossValidatorSuite extends FunSuite with MLlibTestSparkContext {
override def beforeAll(): Unit = {
super.beforeAll()
val sqlContext = new SQLContext(sc)
- dataset = sqlContext.createSchemaRDD(
+ dataset = sqlContext.createDataFrame(
sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
}
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 91c9c52c3c..e594ad868e 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -255,14 +255,14 @@ class ReplSuite extends FunSuite {
assertDoesNotContain("Exception", output)
}
- test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+ test("SPARK-2576 importing SQLContext.createDataFrame.") {
// We need to use local-cluster to test this case.
val output = runInterpreter("local-cluster[1,1,512]",
"""
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- |import sqlContext.createSchemaRDD
+ |import sqlContext.createDataFrame
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDataFrame.collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
diff --git a/sql/README.md b/sql/README.md
index d058a6b011..61a20916a9 100644
--- a/sql/README.md
+++ b/sql/README.md
@@ -44,7 +44,7 @@ Type in expressions to have them evaluated.
Type :help for more information.
scala> val query = sql("SELECT * FROM (SELECT * FROM src) a")
-query: org.apache.spark.sql.SchemaRDD =
+query: org.apache.spark.sql.DataFrame =
== Query Plan ==
== Physical Plan ==
HiveTableScan [key#10,value#11], (MetastoreRelation default, src, None), None
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index 9f30f40a17..6ab99aa388 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -930,13 +930,13 @@ case class MapType(
*
* This interface allows a user to make their own classes more interoperable with SparkSQL;
* e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
- * a SchemaRDD which has class X in the schema.
+ * a `DataFrame` which has class X in the schema.
*
* For SparkSQL to recognize UDTs, the UDT must be annotated with
* [[SQLUserDefinedType]].
*
- * The conversion via `serialize` occurs when instantiating a `SchemaRDD` from another RDD.
- * The conversion via `deserialize` occurs when reading from a `SchemaRDD`.
+ * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD.
+ * The conversion via `deserialize` occurs when reading from a `DataFrame`.
*/
@DeveloperApi
abstract class UserDefinedType[UserType] extends DataType with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index bc22f68833..f1949aa5dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.storage.StorageLevel
@@ -32,9 +33,10 @@ private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryR
* results when subsequent queries are executed. Data is cached using byte buffers stored in an
* InMemoryRelation. This relation is automatically substituted query plans that return the
* `sameResult` as the originally cached query.
+ *
+ * Internal to Spark SQL.
*/
-private[sql] trait CacheManager {
- self: SQLContext =>
+private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
@transient
private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
@@ -43,13 +45,13 @@ private[sql] trait CacheManager {
private val cacheLock = new ReentrantReadWriteLock
/** Returns true if the table is currently cached in-memory. */
- def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty
+ def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
/** Caches the specified table in-memory. */
- def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName))
+ def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
/** Removes the specified table from the in-memory cache. */
- def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName))
+ def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
/** Acquires a read lock on the cache for the duration of `f`. */
private def readLock[A](f: => A): A = {
@@ -91,15 +93,15 @@ private[sql] trait CacheManager {
CachedData(
planToCache,
InMemoryRelation(
- conf.useCompression,
- conf.columnBatchSize,
+ sqlContext.conf.useCompression,
+ sqlContext.conf.columnBatchSize,
storageLevel,
query.queryExecution.executedPlan,
tableName))
}
}
- /** Removes the data for the given SchemaRDD from the cache */
+ /** Removes the data for the given [[DataFrame]] from the cache */
private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
@@ -108,7 +110,7 @@ private[sql] trait CacheManager {
cachedData.remove(dataIndex)
}
- /** Tries to remove the data for the given SchemaRDD from the cache if it's cached */
+ /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */
private[sql] def tryUncacheQuery(
query: DataFrame,
blocking: Boolean = true): Boolean = writeLock {
@@ -122,7 +124,7 @@ private[sql] trait CacheManager {
found
}
- /** Optionally returns cached data for the given SchemaRDD */
+ /** Optionally returns cached data for the given [[DataFrame]] */
private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock {
lookupCachedData(query.queryExecution.analyzed)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3198215b2c..ff59cbf3c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -106,7 +106,7 @@ class DataFrame protected[sql](
* An implicit conversion function internal to this class for us to avoid doing
* "new DataFrame(...)" everywhere.
*/
- private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = {
+ private implicit def logicalPlanToDataFrame(logicalPlan: LogicalPlan): DataFrame = {
new DataFrame(sqlContext, logicalPlan, true)
}
@@ -130,7 +130,7 @@ class DataFrame protected[sql](
/**
* Return the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
*/
- def toDF: DataFrame = this
+ def toDataFrame: DataFrame = this
/** Return the schema of this [[DataFrame]]. */
override def schema: StructType = queryExecution.analyzed.schema
@@ -496,17 +496,17 @@ class DataFrame protected[sql](
}
override def persist(): this.type = {
- sqlContext.cacheQuery(this)
+ sqlContext.cacheManager.cacheQuery(this)
this
}
override def persist(newLevel: StorageLevel): this.type = {
- sqlContext.cacheQuery(this, None, newLevel)
+ sqlContext.cacheManager.cacheQuery(this, None, newLevel)
this
}
override def unpersist(blocking: Boolean): this.type = {
- sqlContext.tryUncacheQuery(this, blocking)
+ sqlContext.cacheManager.tryUncacheQuery(this, blocking)
this
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5030e689c3..d56d4052a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -51,7 +51,6 @@ import org.apache.spark.util.Utils
@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
extends org.apache.spark.Logging
- with CacheManager
with Serializable {
self =>
@@ -117,12 +116,57 @@ class SQLContext(@transient val sparkContext: SparkContext)
case _ =>
}
+ protected[sql] val cacheManager = new CacheManager(this)
+
+ /**
+ * A collection of methods that are considered experimental, but can be used to hook into
+ * the query planner for advanced functionalities.
+ */
+ val experimental: ExperimentalMethods = new ExperimentalMethods(this)
+
+ /**
+ * A collection of methods for registering user-defined functions (UDF).
+ *
+ * The following example registers a Scala closure as UDF:
+ * {{{
+ * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1)
+ * }}}
+ *
+ * The following example registers a UDF in Java:
+ * {{{
+ * sqlContext.udf().register("myUDF",
+ * new UDF2<Integer, String, String>() {
+ * @Override
+ * public String call(Integer arg1, String arg2) {
+ * return arg2 + arg1;
+ * }
+ * }, DataTypes.StringType);
+ * }}}
+ *
+ * Or, to use Java 8 lambda syntax:
+ * {{{
+ * sqlContext.udf().register("myUDF",
+ * (Integer arg1, String arg2) -> arg2 + arg1),
+ * DataTypes.StringType);
+ * }}}
+ */
+ val udf: UDFRegistration = new UDFRegistration(this)
+
+ /** Returns true if the table is currently cached in-memory. */
+ def isCached(tableName: String): Boolean = cacheManager.isCached(tableName)
+
+ /** Caches the specified table in-memory. */
+ def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName)
+
+ /** Removes the specified table from the in-memory cache. */
+ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
+
/**
- * Creates a SchemaRDD from an RDD of case classes.
+ * Creates a DataFrame from an RDD of case classes.
*
* @group userf
*/
- implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = {
+ implicit def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = {
SparkPlan.currentContext.set(self)
val attributeSeq = ScalaReflection.attributesFor[A]
val schema = StructType.fromAttributes(attributeSeq)
@@ -133,7 +177,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]].
*/
- def baseRelationToSchemaRDD(baseRelation: BaseRelation): DataFrame = {
+ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
new DataFrame(this, LogicalRelation(baseRelation))
}
@@ -155,13 +199,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
* val people =
* sc.textFile("examples/src/main/resources/people.txt").map(
* _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
- * val peopleSchemaRDD = sqlContext. applySchema(people, schema)
- * peopleSchemaRDD.printSchema
+ * val dataFrame = sqlContext. applySchema(people, schema)
+ * dataFrame.printSchema
* // root
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
- * peopleSchemaRDD.registerTempTable("people")
+ * dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
*
@@ -169,7 +213,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@DeveloperApi
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
- // TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
+ // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, rowRDD)(self)
new DataFrame(this, logicalPlan)
@@ -309,12 +353,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def dropTempTable(tableName: String): Unit = {
- tryUncacheQuery(table(tableName))
+ cacheManager.tryUncacheQuery(table(tableName))
catalog.unregisterTable(Seq(tableName))
}
/**
- * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
+ * Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @group userf
@@ -327,44 +371,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
- /** Returns the specified table as a SchemaRDD */
+ /** Returns the specified table as a [[DataFrame]]. */
def table(tableName: String): DataFrame =
new DataFrame(this, catalog.lookupRelation(Seq(tableName)))
- /**
- * A collection of methods that are considered experimental, but can be used to hook into
- * the query planner for advanced functionalities.
- */
- val experimental: ExperimentalMethods = new ExperimentalMethods(this)
-
- /**
- * A collection of methods for registering user-defined functions (UDF).
- *
- * The following example registers a Scala closure as UDF:
- * {{{
- * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1)
- * }}}
- *
- * The following example registers a UDF in Java:
- * {{{
- * sqlContext.udf().register("myUDF",
- * new UDF2<Integer, String, String>() {
- * @Override
- * public String call(Integer arg1, String arg2) {
- * return arg2 + arg1;
- * }
- * }, DataTypes.StringType);
- * }}}
- *
- * Or, to use Java 8 lambda syntax:
- * {{{
- * sqlContext.udf().register("myUDF",
- * (Integer arg1, String arg2) -> arg2 + arg1),
- * DataTypes.StringType);
- * }}}
- */
- val udf: UDFRegistration = new UDFRegistration(this)
-
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
@@ -455,7 +465,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected class QueryExecution(val logical: LogicalPlan) {
lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical))
- lazy val withCachedData: LogicalPlan = useCachedData(analyzed)
+ lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
// TODO: Don't just pick the first one...
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
index 2e9d037f93..1beb19437a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList, Map => JMap}
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.Accumulator
+import org.apache.spark.{Accumulator, Logging}
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.api.java._
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.DataType
/**
* Functions for registering user-defined functions.
*/
-class UDFRegistration (sqlContext: SQLContext) extends org.apache.spark.Logging {
+class UDFRegistration(sqlContext: SQLContext) extends Logging {
private val functionRegistry = sqlContext.functionRegistry
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index aeb0960e87..5cc67cdd13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -39,7 +39,7 @@ package object debug {
/**
* :: DeveloperApi ::
- * Augments SchemaRDDs with debug methods.
+ * Augments [[DataFrame]]s with debug methods.
*/
@DeveloperApi
implicit class DebugQuery(query: DataFrame) {
@@ -166,7 +166,7 @@ package object debug {
/**
* :: DeveloperApi ::
- * Augments SchemaRDDs with debug methods.
+ * Augments [[DataFrame]]s with debug methods.
*/
@DeveloperApi
private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index cde5160149..a54485e719 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -26,7 +26,7 @@ import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -34,8 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
/**
* Relation that consists of data stored in a Parquet columnar format.
*
- * Users should interact with parquet files though a SchemaRDD, created by a [[SQLContext]] instead
- * of using this class directly.
+ * Users should interact with parquet files though a [[DataFrame]], created by a [[SQLContext]]
+ * instead of using this class directly.
*
* {{{
* val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 0b312ef51d..9d6c529574 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -95,8 +95,8 @@ trait ParquetTest {
}
/**
- * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`.
- * The Parquet file will be deleted after `f` returns.
+ * Writes `data` to a Parquet file and reads it back as a [[DataFrame]],
+ * which is then passed to `f`. The Parquet file will be deleted after `f` returns.
*/
protected def withParquetRDD[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
@@ -112,9 +112,9 @@ trait ParquetTest {
}
/**
- * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary
- * table named `tableName`, then call `f`. The temporary table together with the Parquet file will
- * be dropped/deleted after `f` returns.
+ * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
+ * temporary table named `tableName`, then call `f`. The temporary table together with the
+ * Parquet file will be dropped/deleted after `f` returns.
*/
protected def withParquetTable[T <: Product: ClassTag: TypeTag]
(data: Seq[T], tableName: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 2564c849b8..906455dd40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -37,8 +37,8 @@ object TestSQLContext
}
/**
- * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to
- * construct SchemaRDD directly out of local data without relying on implicits.
+ * Turn a logical plan into a [[DataFrame]]. This should be removed once we have an easier way to
+ * construct [[DataFrame]] directly out of local data without relying on implicits.
*/
protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = {
new DataFrame(this, plan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 34763156a6..e1e96926cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -51,17 +51,17 @@ class CachedTableSuite extends QueryTest {
}
test("unpersist an uncached table will not raise exception") {
- assert(None == lookupCachedData(testData))
+ assert(None == cacheManager.lookupCachedData(testData))
testData.unpersist(true)
- assert(None == lookupCachedData(testData))
+ assert(None == cacheManager.lookupCachedData(testData))
testData.unpersist(false)
- assert(None == lookupCachedData(testData))
+ assert(None == cacheManager.lookupCachedData(testData))
testData.persist()
- assert(None != lookupCachedData(testData))
+ assert(None != cacheManager.lookupCachedData(testData))
testData.unpersist(true)
- assert(None == lookupCachedData(testData))
+ assert(None == cacheManager.lookupCachedData(testData))
testData.unpersist(false)
- assert(None == lookupCachedData(testData))
+ assert(None == cacheManager.lookupCachedData(testData))
}
test("cache table as select") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 825a1862ba..701950f464 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -33,7 +33,7 @@ class ColumnExpressionSuite extends QueryTest {
ignore("star qualified by data frame object") {
// This is not yet supported.
- val df = testData.toDF
+ val df = testData.toDataFrame
checkAnswer(df.select(df("*")), df.collect().toSeq)
}
@@ -106,13 +106,13 @@ class ColumnExpressionSuite extends QueryTest {
test("isNull") {
checkAnswer(
- nullStrings.toDF.where($"s".isNull),
+ nullStrings.toDataFrame.where($"s".isNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) eq null))
}
test("isNotNull") {
checkAnswer(
- nullStrings.toDF.where($"s".isNotNull),
+ nullStrings.toDataFrame.where($"s".isNotNull),
nullStrings.collect().toSeq.filter(r => r.getString(1) ne null))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 6d7d5aa493..ec3770bc6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -118,19 +118,19 @@ class DataFrameSuite extends QueryTest {
checkAnswer(
arrayData.orderBy('data.getItem(0).asc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)
+ arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq)
checkAnswer(
arrayData.orderBy('data.getItem(0).desc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)
+ arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq)
checkAnswer(
arrayData.orderBy('data.getItem(1).asc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)
+ arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq)
checkAnswer(
arrayData.orderBy('data.getItem(1).desc),
- arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
+ arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq)
}
test("limit") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 79713725c0..561db59104 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -59,7 +59,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
test("join operator selection") {
- clearCache()
+ cacheManager.clearCache()
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]),
@@ -93,7 +93,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
test("broadcasted hash join operator selection") {
- clearCache()
+ cacheManager.clearCache()
sql("CACHE TABLE testData")
Seq(
@@ -384,7 +384,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
}
test("broadcasted left semi join operator selection") {
- clearCache()
+ cacheManager.clearCache()
sql("CACHE TABLE testData")
val tmp = conf.autoBroadcastJoinThreshold
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 07c52de377..a7f2faa3ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -101,7 +101,9 @@ class QueryTest extends PlanTest {
}
}
- /** Asserts that a given SchemaRDD will be executed using the given number of cached results. */
+ /**
+ * Asserts that a given [[DataFrame]] will be executed using the given number of cached results.
+ */
def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
val planWithCaching = query.queryExecution.withCachedData
val cachedData = planWithCaching collect {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4fff99cb3f..c00ae0a856 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -651,8 +651,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Row(values(0).toInt, values(1), values(2).toBoolean, v4)
}
- val schemaRDD1 = applySchema(rowRDD1, schema1)
- schemaRDD1.registerTempTable("applySchema1")
+ val df1 = applySchema(rowRDD1, schema1)
+ df1.registerTempTable("applySchema1")
checkAnswer(
sql("SELECT * FROM applySchema1"),
Row(1, "A1", true, null) ::
@@ -681,8 +681,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
}
- val schemaRDD2 = applySchema(rowRDD2, schema2)
- schemaRDD2.registerTempTable("applySchema2")
+ val df2 = applySchema(rowRDD2, schema2)
+ df2.registerTempTable("applySchema2")
checkAnswer(
sql("SELECT * FROM applySchema2"),
Row(Row(1, true), Map("A1" -> null)) ::
@@ -706,8 +706,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Row(Row(values(0).toInt, values(2).toBoolean), scala.collection.mutable.Map(values(1) -> v4))
}
- val schemaRDD3 = applySchema(rowRDD3, schema2)
- schemaRDD3.registerTempTable("applySchema3")
+ val df3 = applySchema(rowRDD3, schema2)
+ df3.registerTempTable("applySchema3")
checkAnswer(
sql("SELECT f1.f11, f2['D4'] FROM applySchema3"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 9eefe67c04..82dd66916b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -30,11 +30,11 @@ case class TestData(key: Int, value: String)
object TestData {
val testData = TestSQLContext.sparkContext.parallelize(
- (1 to 100).map(i => TestData(i, i.toString))).toDF
+ (1 to 100).map(i => TestData(i, i.toString))).toDataFrame
testData.registerTempTable("testData")
val negativeData = TestSQLContext.sparkContext.parallelize(
- (1 to 100).map(i => TestData(-i, (-i).toString))).toDF
+ (1 to 100).map(i => TestData(-i, (-i).toString))).toDataFrame
negativeData.registerTempTable("negativeData")
case class LargeAndSmallInts(a: Int, b: Int)
@@ -45,7 +45,7 @@ object TestData {
LargeAndSmallInts(2147483645, 1) ::
LargeAndSmallInts(2, 2) ::
LargeAndSmallInts(2147483646, 1) ::
- LargeAndSmallInts(3, 2) :: Nil).toDF
+ LargeAndSmallInts(3, 2) :: Nil).toDataFrame
largeAndSmallInts.registerTempTable("largeAndSmallInts")
case class TestData2(a: Int, b: Int)
@@ -56,7 +56,7 @@ object TestData {
TestData2(2, 1) ::
TestData2(2, 2) ::
TestData2(3, 1) ::
- TestData2(3, 2) :: Nil, 2).toDF
+ TestData2(3, 2) :: Nil, 2).toDataFrame
testData2.registerTempTable("testData2")
case class DecimalData(a: BigDecimal, b: BigDecimal)
@@ -68,7 +68,7 @@ object TestData {
DecimalData(2, 1) ::
DecimalData(2, 2) ::
DecimalData(3, 1) ::
- DecimalData(3, 2) :: Nil).toDF
+ DecimalData(3, 2) :: Nil).toDataFrame
decimalData.registerTempTable("decimalData")
case class BinaryData(a: Array[Byte], b: Int)
@@ -78,14 +78,14 @@ object TestData {
BinaryData("22".getBytes(), 5) ::
BinaryData("122".getBytes(), 3) ::
BinaryData("121".getBytes(), 2) ::
- BinaryData("123".getBytes(), 4) :: Nil).toDF
+ BinaryData("123".getBytes(), 4) :: Nil).toDataFrame
binaryData.registerTempTable("binaryData")
case class TestData3(a: Int, b: Option[Int])
val testData3 =
TestSQLContext.sparkContext.parallelize(
TestData3(1, None) ::
- TestData3(2, Some(2)) :: Nil).toDF
+ TestData3(2, Some(2)) :: Nil).toDataFrame
testData3.registerTempTable("testData3")
val emptyTableData = logical.LocalRelation($"a".int, $"b".int)
@@ -98,7 +98,7 @@ object TestData {
UpperCaseData(3, "C") ::
UpperCaseData(4, "D") ::
UpperCaseData(5, "E") ::
- UpperCaseData(6, "F") :: Nil).toDF
+ UpperCaseData(6, "F") :: Nil).toDataFrame
upperCaseData.registerTempTable("upperCaseData")
case class LowerCaseData(n: Int, l: String)
@@ -107,7 +107,7 @@ object TestData {
LowerCaseData(1, "a") ::
LowerCaseData(2, "b") ::
LowerCaseData(3, "c") ::
- LowerCaseData(4, "d") :: Nil).toDF
+ LowerCaseData(4, "d") :: Nil).toDataFrame
lowerCaseData.registerTempTable("lowerCaseData")
case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]])
@@ -161,7 +161,7 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
NullStrings(1, "abc") ::
NullStrings(2, "ABC") ::
- NullStrings(3, null) :: Nil).toDF
+ NullStrings(3, null) :: Nil).toDataFrame
nullStrings.registerTempTable("nullStrings")
case class TableName(tableName: String)
@@ -201,6 +201,6 @@ object TestData {
TestSQLContext.sparkContext.parallelize(
ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true)
:: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false)
- :: Nil).toDF
+ :: Nil).toDataFrame
complexData.registerTempTable("complexData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index c3a3f8ddc3..fe9a69edbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -104,14 +104,14 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be
expectedQueryResult: => Seq[Int]): Unit = {
test(query) {
- val schemaRdd = sql(query)
- val queryExecution = schemaRdd.queryExecution
+ val df = sql(query)
+ val queryExecution = df.queryExecution
assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") {
- schemaRdd.collect().map(_(0)).toArray
+ df.collect().map(_(0)).toArray
}
- val (readPartitions, readBatches) = schemaRdd.queryExecution.executedPlan.collect {
+ val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
}.head
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 87c28c334d..4e9472c602 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -23,11 +23,11 @@ import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext._
class DebuggingSuite extends FunSuite {
- test("SchemaRDD.debug()") {
+ test("DataFrame.debug()") {
testData.debug()
}
- test("SchemaRDD.typeCheck()") {
+ test("DataFrame.typeCheck()") {
testData.typeCheck()
}
} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index ef198f846c..5a75326d1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -194,7 +194,7 @@ class JsonSuite extends QueryTest {
}
test("Complex field and type inferring with null in sampling") {
- val jsonSchemaRDD = jsonRDD(jsonNullStruct)
+ val jsonDF = jsonRDD(jsonNullStruct)
val expectedSchema = StructType(
StructField("headers", StructType(
StructField("Charset", StringType, true) ::
@@ -203,8 +203,8 @@ class JsonSuite extends QueryTest {
StructField("ip", StringType, true) ::
StructField("nullstr", StringType, true):: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ assert(expectedSchema === jsonDF.schema)
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select nullstr, headers.Host from jsonTable"),
@@ -213,7 +213,7 @@ class JsonSuite extends QueryTest {
}
test("Primitive field and type inferring") {
- val jsonSchemaRDD = jsonRDD(primitiveFieldAndType)
+ val jsonDF = jsonRDD(primitiveFieldAndType)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -224,9 +224,9 @@ class JsonSuite extends QueryTest {
StructField("null", StringType, true) ::
StructField("string", StringType, true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select * from jsonTable"),
@@ -241,7 +241,7 @@ class JsonSuite extends QueryTest {
}
test("Complex field and type inferring") {
- val jsonSchemaRDD = jsonRDD(complexFieldAndType1)
+ val jsonDF = jsonRDD(complexFieldAndType1)
val expectedSchema = StructType(
StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) ::
@@ -265,9 +265,9 @@ class JsonSuite extends QueryTest {
StructField("field1", ArrayType(IntegerType, false), true) ::
StructField("field2", ArrayType(StringType, false), true) :: Nil), true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
// Access elements of a primitive array.
checkAnswer(
@@ -340,8 +340,8 @@ class JsonSuite extends QueryTest {
}
ignore("Complex field and type inferring (Ignored)") {
- val jsonSchemaRDD = jsonRDD(complexFieldAndType1)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(complexFieldAndType1)
+ jsonDF.registerTempTable("jsonTable")
// Right now, "field1" and "field2" are treated as aliases. We should fix it.
checkAnswer(
@@ -358,7 +358,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in primitive field values") {
- val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
+ val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
val expectedSchema = StructType(
StructField("num_bool", StringType, true) ::
@@ -368,9 +368,9 @@ class JsonSuite extends QueryTest {
StructField("num_str", StringType, true) ::
StructField("str_bool", StringType, true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select * from jsonTable"),
@@ -429,8 +429,8 @@ class JsonSuite extends QueryTest {
}
ignore("Type conflict in primitive field values (Ignored)") {
- val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
+ jsonDF.registerTempTable("jsonTable")
// Right now, the analyzer does not promote strings in a boolean expreesion.
// Number and Boolean conflict: resolve the type as boolean in this query.
@@ -463,7 +463,7 @@ class JsonSuite extends QueryTest {
// We should directly cast num_str to DecimalType and also need to do the right type promotion
// in the Project.
checkAnswer(
- jsonSchemaRDD.
+ jsonDF.
where('num_str > Literal(BigDecimal("92233720368547758060"))).
select(('num_str + Literal(1.2)).as("num")),
Row(new java.math.BigDecimal("92233720368547758061.2"))
@@ -482,7 +482,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in complex field values") {
- val jsonSchemaRDD = jsonRDD(complexFieldValueTypeConflict)
+ val jsonDF = jsonRDD(complexFieldValueTypeConflict)
val expectedSchema = StructType(
StructField("array", ArrayType(IntegerType, false), true) ::
@@ -492,9 +492,9 @@ class JsonSuite extends QueryTest {
StructField("field", StringType, true) :: Nil), true) ::
StructField("struct_array", StringType, true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select * from jsonTable"),
@@ -506,7 +506,7 @@ class JsonSuite extends QueryTest {
}
test("Type conflict in array elements") {
- val jsonSchemaRDD = jsonRDD(arrayElementTypeConflict)
+ val jsonDF = jsonRDD(arrayElementTypeConflict)
val expectedSchema = StructType(
StructField("array1", ArrayType(StringType, true), true) ::
@@ -514,9 +514,9 @@ class JsonSuite extends QueryTest {
StructField("field", LongType, true) :: Nil), false), true) ::
StructField("array3", ArrayType(StringType, false), true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select * from jsonTable"),
@@ -534,7 +534,7 @@ class JsonSuite extends QueryTest {
}
test("Handling missing fields") {
- val jsonSchemaRDD = jsonRDD(missingFields)
+ val jsonDF = jsonRDD(missingFields)
val expectedSchema = StructType(
StructField("a", BooleanType, true) ::
@@ -544,16 +544,16 @@ class JsonSuite extends QueryTest {
StructField("field", BooleanType, true) :: Nil), true) ::
StructField("e", StringType, true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
}
test("Loading a JSON dataset from a text file") {
val file = getTempFilePath("json")
val path = file.toString
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
- val jsonSchemaRDD = jsonFile(path)
+ val jsonDF = jsonFile(path)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -564,9 +564,9 @@ class JsonSuite extends QueryTest {
StructField("null", StringType, true) ::
StructField("string", StringType, true) :: Nil)
- assert(expectedSchema === jsonSchemaRDD.schema)
+ assert(expectedSchema === jsonDF.schema)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select * from jsonTable"),
@@ -620,11 +620,11 @@ class JsonSuite extends QueryTest {
StructField("null", StringType, true) ::
StructField("string", StringType, true) :: Nil)
- val jsonSchemaRDD1 = jsonFile(path, schema)
+ val jsonDF1 = jsonFile(path, schema)
- assert(schema === jsonSchemaRDD1.schema)
+ assert(schema === jsonDF1.schema)
- jsonSchemaRDD1.registerTempTable("jsonTable1")
+ jsonDF1.registerTempTable("jsonTable1")
checkAnswer(
sql("select * from jsonTable1"),
@@ -637,11 +637,11 @@ class JsonSuite extends QueryTest {
"this is a simple string.")
)
- val jsonSchemaRDD2 = jsonRDD(primitiveFieldAndType, schema)
+ val jsonDF2 = jsonRDD(primitiveFieldAndType, schema)
- assert(schema === jsonSchemaRDD2.schema)
+ assert(schema === jsonDF2.schema)
- jsonSchemaRDD2.registerTempTable("jsonTable2")
+ jsonDF2.registerTempTable("jsonTable2")
checkAnswer(
sql("select * from jsonTable2"),
@@ -656,8 +656,8 @@ class JsonSuite extends QueryTest {
}
test("SPARK-2096 Correctly parse dot notations") {
- val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(complexFieldAndType2)
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"),
@@ -674,8 +674,8 @@ class JsonSuite extends QueryTest {
}
test("SPARK-3390 Complex arrays") {
- val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(complexFieldAndType2)
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql(
@@ -697,8 +697,8 @@ class JsonSuite extends QueryTest {
}
test("SPARK-3308 Read top level JSON arrays") {
- val jsonSchemaRDD = jsonRDD(jsonArray)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(jsonArray)
+ jsonDF.registerTempTable("jsonTable")
checkAnswer(
sql(
@@ -718,8 +718,8 @@ class JsonSuite extends QueryTest {
val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
- val jsonSchemaRDD = jsonRDD(corruptRecords)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(corruptRecords)
+ jsonDF.registerTempTable("jsonTable")
val schema = StructType(
StructField("_unparsed", StringType, true) ::
@@ -727,7 +727,7 @@ class JsonSuite extends QueryTest {
StructField("b", StringType, true) ::
StructField("c", StringType, true) :: Nil)
- assert(schema === jsonSchemaRDD.schema)
+ assert(schema === jsonDF.schema)
// In HiveContext, backticks should be used to access columns starting with a underscore.
checkAnswer(
@@ -772,8 +772,8 @@ class JsonSuite extends QueryTest {
}
test("SPARK-4068: nulls in arrays") {
- val jsonSchemaRDD = jsonRDD(nullsInArrays)
- jsonSchemaRDD.registerTempTable("jsonTable")
+ val jsonDF = jsonRDD(nullsInArrays)
+ jsonDF.registerTempTable("jsonTable")
val schema = StructType(
StructField("field1",
@@ -787,7 +787,7 @@ class JsonSuite extends QueryTest {
StructField("field4",
ArrayType(ArrayType(ArrayType(IntegerType, false), true), false), true) :: Nil)
- assert(schema === jsonSchemaRDD.schema)
+ assert(schema === jsonDF.schema)
checkAnswer(
sql(
@@ -802,7 +802,7 @@ class JsonSuite extends QueryTest {
)
}
- test("SPARK-4228 SchemaRDD to JSON")
+ test("SPARK-4228 DataFrame to JSON")
{
val schema1 = StructType(
StructField("f1", IntegerType, false) ::
@@ -819,10 +819,10 @@ class JsonSuite extends QueryTest {
Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5)
}
- val schemaRDD1 = applySchema(rowRDD1, schema1)
- schemaRDD1.registerTempTable("applySchema1")
- val schemaRDD2 = schemaRDD1.toDF
- val result = schemaRDD2.toJSON.collect()
+ val df1 = applySchema(rowRDD1, schema1)
+ df1.registerTempTable("applySchema1")
+ val df2 = df1.toDataFrame
+ val result = df2.toJSON.collect()
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
@@ -840,16 +840,16 @@ class JsonSuite extends QueryTest {
Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
}
- val schemaRDD3 = applySchema(rowRDD2, schema2)
- schemaRDD3.registerTempTable("applySchema2")
- val schemaRDD4 = schemaRDD3.toDF
- val result2 = schemaRDD4.toJSON.collect()
+ val df3 = applySchema(rowRDD2, schema2)
+ df3.registerTempTable("applySchema2")
+ val df4 = df3.toDataFrame
+ val result2 = df4.toJSON.collect()
assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
- val jsonSchemaRDD = jsonRDD(primitiveFieldAndType)
- val primTable = jsonRDD(jsonSchemaRDD.toJSON)
+ val jsonDF = jsonRDD(primitiveFieldAndType)
+ val primTable = jsonRDD(jsonDF.toJSON)
primTable.registerTempTable("primativeTable")
checkAnswer(
sql("select * from primativeTable"),
@@ -861,8 +861,8 @@ class JsonSuite extends QueryTest {
"this is a simple string.")
)
- val complexJsonSchemaRDD = jsonRDD(complexFieldAndType1)
- val compTable = jsonRDD(complexJsonSchemaRDD.toJSON)
+ val complexJsonDF = jsonRDD(complexFieldAndType1)
+ val compTable = jsonRDD(complexJsonDF.toJSON)
compTable.registerTempTable("complexTable")
// Access elements of a primitive array.
checkAnswer(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 8e70ae8f56..822864f8ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -396,7 +396,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
}
- clearCache()
+ cacheManager.clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 42bc8a0b67..91af35f096 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -239,7 +239,7 @@ case class InsertIntoHiveTable(
}
// Invalidate the cache.
- sqlContext.invalidateCache(table)
+ sqlContext.cacheManager.invalidateCache(table)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 91f9da35ab..4814cb7ebf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -54,7 +54,7 @@ case class DropTable(
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
try {
- hiveContext.tryUncacheQuery(hiveContext.table(tableName))
+ hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
// This table's metadata is not in
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 5775d83fcb..4dd96bd5a1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
+ testData.toDataFrame.collect().toSeq ++ testData.toDataFrame.collect().toSeq
)
// Now overwrite.
@@ -82,8 +82,8 @@ class InsertIntoHiveTableSuite extends QueryTest {
val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
val rowRDD = TestHive.sparkContext.parallelize(
(1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
- val schemaRDD = applySchema(rowRDD, schema)
- schemaRDD.registerTempTable("tableWithMapValue")
+ val df = applySchema(rowRDD, schema)
+ df.registerTempTable("tableWithMapValue")
sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
@@ -127,8 +127,8 @@ class InsertIntoHiveTableSuite extends QueryTest {
val schema = StructType(Seq(
StructField("a", ArrayType(StringType, containsNull = false))))
val rowRDD = TestHive.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
- val schemaRDD = applySchema(rowRDD, schema)
- schemaRDD.registerTempTable("tableWithArrayValue")
+ val df = applySchema(rowRDD, schema)
+ df.registerTempTable("tableWithArrayValue")
sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
@@ -144,8 +144,8 @@ class InsertIntoHiveTableSuite extends QueryTest {
StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
val rowRDD = TestHive.sparkContext.parallelize(
(1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
- val schemaRDD = applySchema(rowRDD, schema)
- schemaRDD.registerTempTable("tableWithMapValue")
+ val df = applySchema(rowRDD, schema)
+ df.registerTempTable("tableWithMapValue")
sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
@@ -161,8 +161,8 @@ class InsertIntoHiveTableSuite extends QueryTest {
StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
val rowRDD = TestHive.sparkContext.parallelize(
(1 to 100).map(i => Row(Row(s"value$i"))))
- val schemaRDD = applySchema(rowRDD, schema)
- schemaRDD.registerTempTable("tableWithStructValue")
+ val df = applySchema(rowRDD, schema)
+ df.registerTempTable("tableWithStructValue")
sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index d67b00bc9d..0c8a113c75 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -368,7 +368,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s")
}
- test("SchemaRDD toString") {
+ test("DataFrame toString") {
sql("SHOW TABLES").toString
sql("SELECT * FROM src").toString
}
@@ -479,7 +479,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
explanation.contains("== Physical Plan ==")
}
- test("SPARK-1704: Explain commands as a SchemaRDD") {
+ test("SPARK-1704: Explain commands as a DataFrame") {
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
val rdd = sql("explain select key, count(value) from src group by key")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 7f9f1ac7cd..faa7357b90 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -222,7 +222,7 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT distinct key FROM src order by key").collect().toSeq)
}
- test("SPARK-4963 SchemaRDD sample on mutable row return wrong result") {
+ test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
sql("SELECT * FROM src WHERE key % 2 = 0")
.sample(withReplacement = false, fraction = 0.3)
.registerTempTable("sampled")