From c8e934ef3cd06f02f9a2946e96a1a52293c22490 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 28 Jan 2015 12:10:01 -0800 Subject: [SPARK-5447][SQL] Replaced reference to SchemaRDD with DataFrame. and [SPARK-5448][SQL] Make CacheManager a concrete class and field in SQLContext Author: Reynold Xin Closes #4242 from rxin/sqlCleanup and squashes the following commits: e351cb2 [Reynold Xin] Fixed toDataFrame. 6545c42 [Reynold Xin] More changes. 728c017 [Reynold Xin] [SPARK-5447][SQL] Replaced reference to SchemaRDD with DataFrame. --- .../spark/examples/mllib/DatasetExample.scala | 2 +- .../org/apache/spark/ml/recommendation/ALS.scala | 2 +- .../org/apache/spark/mllib/linalg/Vectors.scala | 2 +- .../classification/LogisticRegressionSuite.scala | 2 +- .../apache/spark/ml/recommendation/ALSSuite.scala | 2 +- .../spark/ml/tuning/CrossValidatorSuite.scala | 2 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 6 +- sql/README.md | 2 +- .../org/apache/spark/sql/types/dataTypes.scala | 6 +- .../scala/org/apache/spark/sql/CacheManager.scala | 22 ++-- .../scala/org/apache/spark/sql/DataFrame.scala | 10 +- .../scala/org/apache/spark/sql/SQLContext.scala | 102 ++++++++++-------- .../org/apache/spark/sql/UdfRegistration.scala | 4 +- .../apache/spark/sql/execution/debug/package.scala | 4 +- .../apache/spark/sql/parquet/ParquetRelation.scala | 6 +- .../org/apache/spark/sql/parquet/ParquetTest.scala | 10 +- .../org/apache/spark/sql/test/TestSQLContext.scala | 4 +- .../org/apache/spark/sql/CachedTableSuite.scala | 12 +-- .../apache/spark/sql/ColumnExpressionSuite.scala | 6 +- .../org/apache/spark/sql/DataFrameSuite.scala | 8 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 6 +- .../scala/org/apache/spark/sql/QueryTest.scala | 4 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 +-- .../test/scala/org/apache/spark/sql/TestData.scala | 22 ++-- .../sql/columnar/PartitionBatchPruningSuite.scala | 8 +- .../spark/sql/execution/debug/DebuggingSuite.scala | 4 +- .../org/apache/spark/sql/json/JsonSuite.scala | 120 ++++++++++----------- .../scala/org/apache/spark/sql/hive/TestHive.scala | 2 +- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 18 ++-- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 33 files changed, 217 insertions(+), 203 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala index f229a58985..ab58375649 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala @@ -81,7 +81,7 @@ object DatasetExample { println(s"Loaded ${origData.count()} instances from file: ${params.input}") // Convert input data to DataFrame explicitly. - val df: DataFrame = origData.toDF + val df: DataFrame = origData.toDataFrame println(s"Inferred schema:\n${df.schema.prettyJson}") println(s"Converted to DataFrame with ${df.count()} records") diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index f6437c7fbc..f0bea5f469 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -148,7 +148,7 @@ class ALSModel private[ml] ( } private object ALSModel { - /** Case class to convert factors to SchemaRDDs */ + /** Case class to convert factors to [[DataFrame]]s */ private case class Factor(id: Int, features: Seq[Float]) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 2834ea75ce..31c33f1bf6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -110,7 +110,7 @@ sealed trait Vector extends Serializable { /** * User-defined type for [[Vector]] which allows easy interaction with SQL - * via [[org.apache.spark.sql.SchemaRDD]]. + * via [[org.apache.spark.sql.DataFrame]]. */ private[spark] class VectorUDT extends UserDefinedType[Vector] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 1912afce93..33e40dc741 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -31,7 +31,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() sqlContext = new SQLContext(sc) - dataset = sqlContext.createSchemaRDD( + dataset = sqlContext.createDataFrame( sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2)) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 58289acdbc..9da253c61d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -350,7 +350,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { numItemBlocks: Int = 3, targetRMSE: Double = 0.05): Unit = { val sqlContext = this.sqlContext - import sqlContext.createSchemaRDD + import sqlContext.createDataFrame val als = new ALS() .setRank(rank) .setRegParam(regParam) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 74104fa7a6..761ea821ef 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -32,7 +32,7 @@ class CrossValidatorSuite extends FunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() val sqlContext = new SQLContext(sc) - dataset = sqlContext.createSchemaRDD( + dataset = sqlContext.createDataFrame( sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2)) } diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 91c9c52c3c..e594ad868e 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -255,14 +255,14 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) } - test("SPARK-2576 importing SQLContext.createSchemaRDD.") { + test("SPARK-2576 importing SQLContext.createDataFrame.") { // We need to use local-cluster to test this case. val output = runInterpreter("local-cluster[1,1,512]", """ |val sqlContext = new org.apache.spark.sql.SQLContext(sc) - |import sqlContext.createSchemaRDD + |import sqlContext.createDataFrame |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDataFrame.collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) diff --git a/sql/README.md b/sql/README.md index d058a6b011..61a20916a9 100644 --- a/sql/README.md +++ b/sql/README.md @@ -44,7 +44,7 @@ Type in expressions to have them evaluated. Type :help for more information. scala> val query = sql("SELECT * FROM (SELECT * FROM src) a") -query: org.apache.spark.sql.SchemaRDD = +query: org.apache.spark.sql.DataFrame = == Query Plan == == Physical Plan == HiveTableScan [key#10,value#11], (MetastoreRelation default, src, None), None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index 9f30f40a17..6ab99aa388 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -930,13 +930,13 @@ case class MapType( * * This interface allows a user to make their own classes more interoperable with SparkSQL; * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create - * a SchemaRDD which has class X in the schema. + * a `DataFrame` which has class X in the schema. * * For SparkSQL to recognize UDTs, the UDT must be annotated with * [[SQLUserDefinedType]]. * - * The conversion via `serialize` occurs when instantiating a `SchemaRDD` from another RDD. - * The conversion via `deserialize` occurs when reading from a `SchemaRDD`. + * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD. + * The conversion via `deserialize` occurs when reading from a `DataFrame`. */ @DeveloperApi abstract class UserDefinedType[UserType] extends DataType with Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index bc22f68833..f1949aa5dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.util.concurrent.locks.ReentrantReadWriteLock +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.storage.StorageLevel @@ -32,9 +33,10 @@ private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryR * results when subsequent queries are executed. Data is cached using byte buffers stored in an * InMemoryRelation. This relation is automatically substituted query plans that return the * `sameResult` as the originally cached query. + * + * Internal to Spark SQL. */ -private[sql] trait CacheManager { - self: SQLContext => +private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { @transient private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData] @@ -43,13 +45,13 @@ private[sql] trait CacheManager { private val cacheLock = new ReentrantReadWriteLock /** Returns true if the table is currently cached in-memory. */ - def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty + def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty /** Caches the specified table in-memory. */ - def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName)) + def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName)) /** Removes the specified table from the in-memory cache. */ - def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName)) + def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName)) /** Acquires a read lock on the cache for the duration of `f`. */ private def readLock[A](f: => A): A = { @@ -91,15 +93,15 @@ private[sql] trait CacheManager { CachedData( planToCache, InMemoryRelation( - conf.useCompression, - conf.columnBatchSize, + sqlContext.conf.useCompression, + sqlContext.conf.columnBatchSize, storageLevel, query.queryExecution.executedPlan, tableName)) } } - /** Removes the data for the given SchemaRDD from the cache */ + /** Removes the data for the given [[DataFrame]] from the cache */ private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) @@ -108,7 +110,7 @@ private[sql] trait CacheManager { cachedData.remove(dataIndex) } - /** Tries to remove the data for the given SchemaRDD from the cache if it's cached */ + /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */ private[sql] def tryUncacheQuery( query: DataFrame, blocking: Boolean = true): Boolean = writeLock { @@ -122,7 +124,7 @@ private[sql] trait CacheManager { found } - /** Optionally returns cached data for the given SchemaRDD */ + /** Optionally returns cached data for the given [[DataFrame]] */ private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock { lookupCachedData(query.queryExecution.analyzed) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3198215b2c..ff59cbf3c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -106,7 +106,7 @@ class DataFrame protected[sql]( * An implicit conversion function internal to this class for us to avoid doing * "new DataFrame(...)" everywhere. */ - private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = { + private implicit def logicalPlanToDataFrame(logicalPlan: LogicalPlan): DataFrame = { new DataFrame(sqlContext, logicalPlan, true) } @@ -130,7 +130,7 @@ class DataFrame protected[sql]( /** * Return the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala. */ - def toDF: DataFrame = this + def toDataFrame: DataFrame = this /** Return the schema of this [[DataFrame]]. */ override def schema: StructType = queryExecution.analyzed.schema @@ -496,17 +496,17 @@ class DataFrame protected[sql]( } override def persist(): this.type = { - sqlContext.cacheQuery(this) + sqlContext.cacheManager.cacheQuery(this) this } override def persist(newLevel: StorageLevel): this.type = { - sqlContext.cacheQuery(this, None, newLevel) + sqlContext.cacheManager.cacheQuery(this, None, newLevel) this } override def unpersist(blocking: Boolean): this.type = { - sqlContext.tryUncacheQuery(this, blocking) + sqlContext.cacheManager.tryUncacheQuery(this, blocking) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5030e689c3..d56d4052a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -51,7 +51,6 @@ import org.apache.spark.util.Utils @AlphaComponent class SQLContext(@transient val sparkContext: SparkContext) extends org.apache.spark.Logging - with CacheManager with Serializable { self => @@ -117,12 +116,57 @@ class SQLContext(@transient val sparkContext: SparkContext) case _ => } + protected[sql] val cacheManager = new CacheManager(this) + + /** + * A collection of methods that are considered experimental, but can be used to hook into + * the query planner for advanced functionalities. + */ + val experimental: ExperimentalMethods = new ExperimentalMethods(this) + + /** + * A collection of methods for registering user-defined functions (UDF). + * + * The following example registers a Scala closure as UDF: + * {{{ + * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1) + * }}} + * + * The following example registers a UDF in Java: + * {{{ + * sqlContext.udf().register("myUDF", + * new UDF2() { + * @Override + * public String call(Integer arg1, String arg2) { + * return arg2 + arg1; + * } + * }, DataTypes.StringType); + * }}} + * + * Or, to use Java 8 lambda syntax: + * {{{ + * sqlContext.udf().register("myUDF", + * (Integer arg1, String arg2) -> arg2 + arg1), + * DataTypes.StringType); + * }}} + */ + val udf: UDFRegistration = new UDFRegistration(this) + + /** Returns true if the table is currently cached in-memory. */ + def isCached(tableName: String): Boolean = cacheManager.isCached(tableName) + + /** Caches the specified table in-memory. */ + def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName) + + /** Removes the specified table from the in-memory cache. */ + def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) + /** - * Creates a SchemaRDD from an RDD of case classes. + * Creates a DataFrame from an RDD of case classes. * * @group userf */ - implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = { + implicit def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = { SparkPlan.currentContext.set(self) val attributeSeq = ScalaReflection.attributesFor[A] val schema = StructType.fromAttributes(attributeSeq) @@ -133,7 +177,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]]. */ - def baseRelationToSchemaRDD(baseRelation: BaseRelation): DataFrame = { + def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { new DataFrame(this, LogicalRelation(baseRelation)) } @@ -155,13 +199,13 @@ class SQLContext(@transient val sparkContext: SparkContext) * val people = * sc.textFile("examples/src/main/resources/people.txt").map( * _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) - * val peopleSchemaRDD = sqlContext. applySchema(people, schema) - * peopleSchemaRDD.printSchema + * val dataFrame = sqlContext. applySchema(people, schema) + * dataFrame.printSchema * // root * // |-- name: string (nullable = false) * // |-- age: integer (nullable = true) * - * peopleSchemaRDD.registerTempTable("people") + * dataFrame.registerTempTable("people") * sqlContext.sql("select name from people").collect.foreach(println) * }}} * @@ -169,7 +213,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @DeveloperApi def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = { - // TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied + // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD(schema.toAttributes, rowRDD)(self) new DataFrame(this, logicalPlan) @@ -309,12 +353,12 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def dropTempTable(tableName: String): Unit = { - tryUncacheQuery(table(tableName)) + cacheManager.tryUncacheQuery(table(tableName)) catalog.unregisterTable(Seq(tableName)) } /** - * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is + * Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is * used for SQL parsing can be configured with 'spark.sql.dialect'. * * @group userf @@ -327,44 +371,10 @@ class SQLContext(@transient val sparkContext: SparkContext) } } - /** Returns the specified table as a SchemaRDD */ + /** Returns the specified table as a [[DataFrame]]. */ def table(tableName: String): DataFrame = new DataFrame(this, catalog.lookupRelation(Seq(tableName))) - /** - * A collection of methods that are considered experimental, but can be used to hook into - * the query planner for advanced functionalities. - */ - val experimental: ExperimentalMethods = new ExperimentalMethods(this) - - /** - * A collection of methods for registering user-defined functions (UDF). - * - * The following example registers a Scala closure as UDF: - * {{{ - * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1) - * }}} - * - * The following example registers a UDF in Java: - * {{{ - * sqlContext.udf().register("myUDF", - * new UDF2() { - * @Override - * public String call(Integer arg1, String arg2) { - * return arg2 + arg1; - * } - * }, DataTypes.StringType); - * }}} - * - * Or, to use Java 8 lambda syntax: - * {{{ - * sqlContext.udf().register("myUDF", - * (Integer arg1, String arg2) -> arg2 + arg1), - * DataTypes.StringType); - * }}} - */ - val udf: UDFRegistration = new UDFRegistration(this) - protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext @@ -455,7 +465,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected class QueryExecution(val logical: LogicalPlan) { lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical)) - lazy val withCachedData: LogicalPlan = useCachedData(analyzed) + lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed) lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData) // TODO: Don't just pick the first one... diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala index 2e9d037f93..1beb19437a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala @@ -21,7 +21,7 @@ import java.util.{List => JList, Map => JMap} import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.Accumulator +import org.apache.spark.{Accumulator, Logging} import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.java._ @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.DataType /** * Functions for registering user-defined functions. */ -class UDFRegistration (sqlContext: SQLContext) extends org.apache.spark.Logging { +class UDFRegistration(sqlContext: SQLContext) extends Logging { private val functionRegistry = sqlContext.functionRegistry diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index aeb0960e87..5cc67cdd13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -39,7 +39,7 @@ package object debug { /** * :: DeveloperApi :: - * Augments SchemaRDDs with debug methods. + * Augments [[DataFrame]]s with debug methods. */ @DeveloperApi implicit class DebugQuery(query: DataFrame) { @@ -166,7 +166,7 @@ package object debug { /** * :: DeveloperApi :: - * Augments SchemaRDDs with debug methods. + * Augments [[DataFrame]]s with debug methods. */ @DeveloperApi private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index cde5160149..a54485e719 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -26,7 +26,7 @@ import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName import parquet.schema.MessageType -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} @@ -34,8 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati /** * Relation that consists of data stored in a Parquet columnar format. * - * Users should interact with parquet files though a SchemaRDD, created by a [[SQLContext]] instead - * of using this class directly. + * Users should interact with parquet files though a [[DataFrame]], created by a [[SQLContext]] + * instead of using this class directly. * * {{{ * val parquetRDD = sqlContext.parquetFile("path/to/parquet.file") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index 0b312ef51d..9d6c529574 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -95,8 +95,8 @@ trait ParquetTest { } /** - * Writes `data` to a Parquet file and reads it back as a SchemaRDD, which is then passed to `f`. - * The Parquet file will be deleted after `f` returns. + * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], + * which is then passed to `f`. The Parquet file will be deleted after `f` returns. */ protected def withParquetRDD[T <: Product: ClassTag: TypeTag] (data: Seq[T]) @@ -112,9 +112,9 @@ trait ParquetTest { } /** - * Writes `data` to a Parquet file, reads it back as a SchemaRDD and registers it as a temporary - * table named `tableName`, then call `f`. The temporary table together with the Parquet file will - * be dropped/deleted after `f` returns. + * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a + * temporary table named `tableName`, then call `f`. The temporary table together with the + * Parquet file will be dropped/deleted after `f` returns. */ protected def withParquetTable[T <: Product: ClassTag: TypeTag] (data: Seq[T], tableName: String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 2564c849b8..906455dd40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -37,8 +37,8 @@ object TestSQLContext } /** - * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to - * construct SchemaRDD directly out of local data without relying on implicits. + * Turn a logical plan into a [[DataFrame]]. This should be removed once we have an easier way to + * construct [[DataFrame]] directly out of local data without relying on implicits. */ protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = { new DataFrame(this, plan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 34763156a6..e1e96926cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -51,17 +51,17 @@ class CachedTableSuite extends QueryTest { } test("unpersist an uncached table will not raise exception") { - assert(None == lookupCachedData(testData)) + assert(None == cacheManager.lookupCachedData(testData)) testData.unpersist(true) - assert(None == lookupCachedData(testData)) + assert(None == cacheManager.lookupCachedData(testData)) testData.unpersist(false) - assert(None == lookupCachedData(testData)) + assert(None == cacheManager.lookupCachedData(testData)) testData.persist() - assert(None != lookupCachedData(testData)) + assert(None != cacheManager.lookupCachedData(testData)) testData.unpersist(true) - assert(None == lookupCachedData(testData)) + assert(None == cacheManager.lookupCachedData(testData)) testData.unpersist(false) - assert(None == lookupCachedData(testData)) + assert(None == cacheManager.lookupCachedData(testData)) } test("cache table as select") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 825a1862ba..701950f464 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -33,7 +33,7 @@ class ColumnExpressionSuite extends QueryTest { ignore("star qualified by data frame object") { // This is not yet supported. - val df = testData.toDF + val df = testData.toDataFrame checkAnswer(df.select(df("*")), df.collect().toSeq) } @@ -106,13 +106,13 @@ class ColumnExpressionSuite extends QueryTest { test("isNull") { checkAnswer( - nullStrings.toDF.where($"s".isNull), + nullStrings.toDataFrame.where($"s".isNull), nullStrings.collect().toSeq.filter(r => r.getString(1) eq null)) } test("isNotNull") { checkAnswer( - nullStrings.toDF.where($"s".isNotNull), + nullStrings.toDataFrame.where($"s".isNotNull), nullStrings.collect().toSeq.filter(r => r.getString(1) ne null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6d7d5aa493..ec3770bc6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -118,19 +118,19 @@ class DataFrameSuite extends QueryTest { checkAnswer( arrayData.orderBy('data.getItem(0).asc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq) + arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).toSeq) checkAnswer( arrayData.orderBy('data.getItem(0).desc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq) + arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(0)).reverse.toSeq) checkAnswer( arrayData.orderBy('data.getItem(1).asc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq) + arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).toSeq) checkAnswer( arrayData.orderBy('data.getItem(1).desc), - arrayData.toDF.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq) + arrayData.toDataFrame.collect().sortBy(_.getAs[Seq[Int]](0)(1)).reverse.toSeq) } test("limit") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 79713725c0..561db59104 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -59,7 +59,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { } test("join operator selection") { - clearCache() + cacheManager.clearCache() Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]), @@ -93,7 +93,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { } test("broadcasted hash join operator selection") { - clearCache() + cacheManager.clearCache() sql("CACHE TABLE testData") Seq( @@ -384,7 +384,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { } test("broadcasted left semi join operator selection") { - clearCache() + cacheManager.clearCache() sql("CACHE TABLE testData") val tmp = conf.autoBroadcastJoinThreshold diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 07c52de377..a7f2faa3ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -101,7 +101,9 @@ class QueryTest extends PlanTest { } } - /** Asserts that a given SchemaRDD will be executed using the given number of cached results. */ + /** + * Asserts that a given [[DataFrame]] will be executed using the given number of cached results. + */ def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = { val planWithCaching = query.queryExecution.withCachedData val cachedData = planWithCaching collect { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fff99cb3f..c00ae0a856 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -651,8 +651,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Row(values(0).toInt, values(1), values(2).toBoolean, v4) } - val schemaRDD1 = applySchema(rowRDD1, schema1) - schemaRDD1.registerTempTable("applySchema1") + val df1 = applySchema(rowRDD1, schema1) + df1.registerTempTable("applySchema1") checkAnswer( sql("SELECT * FROM applySchema1"), Row(1, "A1", true, null) :: @@ -681,8 +681,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4)) } - val schemaRDD2 = applySchema(rowRDD2, schema2) - schemaRDD2.registerTempTable("applySchema2") + val df2 = applySchema(rowRDD2, schema2) + df2.registerTempTable("applySchema2") checkAnswer( sql("SELECT * FROM applySchema2"), Row(Row(1, true), Map("A1" -> null)) :: @@ -706,8 +706,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Row(Row(values(0).toInt, values(2).toBoolean), scala.collection.mutable.Map(values(1) -> v4)) } - val schemaRDD3 = applySchema(rowRDD3, schema2) - schemaRDD3.registerTempTable("applySchema3") + val df3 = applySchema(rowRDD3, schema2) + df3.registerTempTable("applySchema3") checkAnswer( sql("SELECT f1.f11, f2['D4'] FROM applySchema3"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 9eefe67c04..82dd66916b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -30,11 +30,11 @@ case class TestData(key: Int, value: String) object TestData { val testData = TestSQLContext.sparkContext.parallelize( - (1 to 100).map(i => TestData(i, i.toString))).toDF + (1 to 100).map(i => TestData(i, i.toString))).toDataFrame testData.registerTempTable("testData") val negativeData = TestSQLContext.sparkContext.parallelize( - (1 to 100).map(i => TestData(-i, (-i).toString))).toDF + (1 to 100).map(i => TestData(-i, (-i).toString))).toDataFrame negativeData.registerTempTable("negativeData") case class LargeAndSmallInts(a: Int, b: Int) @@ -45,7 +45,7 @@ object TestData { LargeAndSmallInts(2147483645, 1) :: LargeAndSmallInts(2, 2) :: LargeAndSmallInts(2147483646, 1) :: - LargeAndSmallInts(3, 2) :: Nil).toDF + LargeAndSmallInts(3, 2) :: Nil).toDataFrame largeAndSmallInts.registerTempTable("largeAndSmallInts") case class TestData2(a: Int, b: Int) @@ -56,7 +56,7 @@ object TestData { TestData2(2, 1) :: TestData2(2, 2) :: TestData2(3, 1) :: - TestData2(3, 2) :: Nil, 2).toDF + TestData2(3, 2) :: Nil, 2).toDataFrame testData2.registerTempTable("testData2") case class DecimalData(a: BigDecimal, b: BigDecimal) @@ -68,7 +68,7 @@ object TestData { DecimalData(2, 1) :: DecimalData(2, 2) :: DecimalData(3, 1) :: - DecimalData(3, 2) :: Nil).toDF + DecimalData(3, 2) :: Nil).toDataFrame decimalData.registerTempTable("decimalData") case class BinaryData(a: Array[Byte], b: Int) @@ -78,14 +78,14 @@ object TestData { BinaryData("22".getBytes(), 5) :: BinaryData("122".getBytes(), 3) :: BinaryData("121".getBytes(), 2) :: - BinaryData("123".getBytes(), 4) :: Nil).toDF + BinaryData("123".getBytes(), 4) :: Nil).toDataFrame binaryData.registerTempTable("binaryData") case class TestData3(a: Int, b: Option[Int]) val testData3 = TestSQLContext.sparkContext.parallelize( TestData3(1, None) :: - TestData3(2, Some(2)) :: Nil).toDF + TestData3(2, Some(2)) :: Nil).toDataFrame testData3.registerTempTable("testData3") val emptyTableData = logical.LocalRelation($"a".int, $"b".int) @@ -98,7 +98,7 @@ object TestData { UpperCaseData(3, "C") :: UpperCaseData(4, "D") :: UpperCaseData(5, "E") :: - UpperCaseData(6, "F") :: Nil).toDF + UpperCaseData(6, "F") :: Nil).toDataFrame upperCaseData.registerTempTable("upperCaseData") case class LowerCaseData(n: Int, l: String) @@ -107,7 +107,7 @@ object TestData { LowerCaseData(1, "a") :: LowerCaseData(2, "b") :: LowerCaseData(3, "c") :: - LowerCaseData(4, "d") :: Nil).toDF + LowerCaseData(4, "d") :: Nil).toDataFrame lowerCaseData.registerTempTable("lowerCaseData") case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]]) @@ -161,7 +161,7 @@ object TestData { TestSQLContext.sparkContext.parallelize( NullStrings(1, "abc") :: NullStrings(2, "ABC") :: - NullStrings(3, null) :: Nil).toDF + NullStrings(3, null) :: Nil).toDataFrame nullStrings.registerTempTable("nullStrings") case class TableName(tableName: String) @@ -201,6 +201,6 @@ object TestData { TestSQLContext.sparkContext.parallelize( ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true) :: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false) - :: Nil).toDF + :: Nil).toDataFrame complexData.registerTempTable("complexData") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala index c3a3f8ddc3..fe9a69edbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala @@ -104,14 +104,14 @@ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with Be expectedQueryResult: => Seq[Int]): Unit = { test(query) { - val schemaRdd = sql(query) - val queryExecution = schemaRdd.queryExecution + val df = sql(query) + val queryExecution = df.queryExecution assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") { - schemaRdd.collect().map(_(0)).toArray + df.collect().map(_(0)).toArray } - val (readPartitions, readBatches) = schemaRdd.queryExecution.executedPlan.collect { + val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect { case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) }.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 87c28c334d..4e9472c602 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext._ class DebuggingSuite extends FunSuite { - test("SchemaRDD.debug()") { + test("DataFrame.debug()") { testData.debug() } - test("SchemaRDD.typeCheck()") { + test("DataFrame.typeCheck()") { testData.typeCheck() } } \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index ef198f846c..5a75326d1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -194,7 +194,7 @@ class JsonSuite extends QueryTest { } test("Complex field and type inferring with null in sampling") { - val jsonSchemaRDD = jsonRDD(jsonNullStruct) + val jsonDF = jsonRDD(jsonNullStruct) val expectedSchema = StructType( StructField("headers", StructType( StructField("Charset", StringType, true) :: @@ -203,8 +203,8 @@ class JsonSuite extends QueryTest { StructField("ip", StringType, true) :: StructField("nullstr", StringType, true):: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + assert(expectedSchema === jsonDF.schema) + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select nullstr, headers.Host from jsonTable"), @@ -213,7 +213,7 @@ class JsonSuite extends QueryTest { } test("Primitive field and type inferring") { - val jsonSchemaRDD = jsonRDD(primitiveFieldAndType) + val jsonDF = jsonRDD(primitiveFieldAndType) val expectedSchema = StructType( StructField("bigInteger", DecimalType.Unlimited, true) :: @@ -224,9 +224,9 @@ class JsonSuite extends QueryTest { StructField("null", StringType, true) :: StructField("string", StringType, true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select * from jsonTable"), @@ -241,7 +241,7 @@ class JsonSuite extends QueryTest { } test("Complex field and type inferring") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType1) + val jsonDF = jsonRDD(complexFieldAndType1) val expectedSchema = StructType( StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) :: @@ -265,9 +265,9 @@ class JsonSuite extends QueryTest { StructField("field1", ArrayType(IntegerType, false), true) :: StructField("field2", ArrayType(StringType, false), true) :: Nil), true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") // Access elements of a primitive array. checkAnswer( @@ -340,8 +340,8 @@ class JsonSuite extends QueryTest { } ignore("Complex field and type inferring (Ignored)") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType1) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(complexFieldAndType1) + jsonDF.registerTempTable("jsonTable") // Right now, "field1" and "field2" are treated as aliases. We should fix it. checkAnswer( @@ -358,7 +358,7 @@ class JsonSuite extends QueryTest { } test("Type conflict in primitive field values") { - val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict) + val jsonDF = jsonRDD(primitiveFieldValueTypeConflict) val expectedSchema = StructType( StructField("num_bool", StringType, true) :: @@ -368,9 +368,9 @@ class JsonSuite extends QueryTest { StructField("num_str", StringType, true) :: StructField("str_bool", StringType, true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select * from jsonTable"), @@ -429,8 +429,8 @@ class JsonSuite extends QueryTest { } ignore("Type conflict in primitive field values (Ignored)") { - val jsonSchemaRDD = jsonRDD(primitiveFieldValueTypeConflict) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(primitiveFieldValueTypeConflict) + jsonDF.registerTempTable("jsonTable") // Right now, the analyzer does not promote strings in a boolean expreesion. // Number and Boolean conflict: resolve the type as boolean in this query. @@ -463,7 +463,7 @@ class JsonSuite extends QueryTest { // We should directly cast num_str to DecimalType and also need to do the right type promotion // in the Project. checkAnswer( - jsonSchemaRDD. + jsonDF. where('num_str > Literal(BigDecimal("92233720368547758060"))). select(('num_str + Literal(1.2)).as("num")), Row(new java.math.BigDecimal("92233720368547758061.2")) @@ -482,7 +482,7 @@ class JsonSuite extends QueryTest { } test("Type conflict in complex field values") { - val jsonSchemaRDD = jsonRDD(complexFieldValueTypeConflict) + val jsonDF = jsonRDD(complexFieldValueTypeConflict) val expectedSchema = StructType( StructField("array", ArrayType(IntegerType, false), true) :: @@ -492,9 +492,9 @@ class JsonSuite extends QueryTest { StructField("field", StringType, true) :: Nil), true) :: StructField("struct_array", StringType, true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select * from jsonTable"), @@ -506,7 +506,7 @@ class JsonSuite extends QueryTest { } test("Type conflict in array elements") { - val jsonSchemaRDD = jsonRDD(arrayElementTypeConflict) + val jsonDF = jsonRDD(arrayElementTypeConflict) val expectedSchema = StructType( StructField("array1", ArrayType(StringType, true), true) :: @@ -514,9 +514,9 @@ class JsonSuite extends QueryTest { StructField("field", LongType, true) :: Nil), false), true) :: StructField("array3", ArrayType(StringType, false), true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select * from jsonTable"), @@ -534,7 +534,7 @@ class JsonSuite extends QueryTest { } test("Handling missing fields") { - val jsonSchemaRDD = jsonRDD(missingFields) + val jsonDF = jsonRDD(missingFields) val expectedSchema = StructType( StructField("a", BooleanType, true) :: @@ -544,16 +544,16 @@ class JsonSuite extends QueryTest { StructField("field", BooleanType, true) :: Nil), true) :: StructField("e", StringType, true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") } test("Loading a JSON dataset from a text file") { val file = getTempFilePath("json") val path = file.toString primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonSchemaRDD = jsonFile(path) + val jsonDF = jsonFile(path) val expectedSchema = StructType( StructField("bigInteger", DecimalType.Unlimited, true) :: @@ -564,9 +564,9 @@ class JsonSuite extends QueryTest { StructField("null", StringType, true) :: StructField("string", StringType, true) :: Nil) - assert(expectedSchema === jsonSchemaRDD.schema) + assert(expectedSchema === jsonDF.schema) - jsonSchemaRDD.registerTempTable("jsonTable") + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select * from jsonTable"), @@ -620,11 +620,11 @@ class JsonSuite extends QueryTest { StructField("null", StringType, true) :: StructField("string", StringType, true) :: Nil) - val jsonSchemaRDD1 = jsonFile(path, schema) + val jsonDF1 = jsonFile(path, schema) - assert(schema === jsonSchemaRDD1.schema) + assert(schema === jsonDF1.schema) - jsonSchemaRDD1.registerTempTable("jsonTable1") + jsonDF1.registerTempTable("jsonTable1") checkAnswer( sql("select * from jsonTable1"), @@ -637,11 +637,11 @@ class JsonSuite extends QueryTest { "this is a simple string.") ) - val jsonSchemaRDD2 = jsonRDD(primitiveFieldAndType, schema) + val jsonDF2 = jsonRDD(primitiveFieldAndType, schema) - assert(schema === jsonSchemaRDD2.schema) + assert(schema === jsonDF2.schema) - jsonSchemaRDD2.registerTempTable("jsonTable2") + jsonDF2.registerTempTable("jsonTable2") checkAnswer( sql("select * from jsonTable2"), @@ -656,8 +656,8 @@ class JsonSuite extends QueryTest { } test("SPARK-2096 Correctly parse dot notations") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType2) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(complexFieldAndType2) + jsonDF.registerTempTable("jsonTable") checkAnswer( sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), @@ -674,8 +674,8 @@ class JsonSuite extends QueryTest { } test("SPARK-3390 Complex arrays") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType2) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(complexFieldAndType2) + jsonDF.registerTempTable("jsonTable") checkAnswer( sql( @@ -697,8 +697,8 @@ class JsonSuite extends QueryTest { } test("SPARK-3308 Read top level JSON arrays") { - val jsonSchemaRDD = jsonRDD(jsonArray) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(jsonArray) + jsonDF.registerTempTable("jsonTable") checkAnswer( sql( @@ -718,8 +718,8 @@ class JsonSuite extends QueryTest { val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") - val jsonSchemaRDD = jsonRDD(corruptRecords) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(corruptRecords) + jsonDF.registerTempTable("jsonTable") val schema = StructType( StructField("_unparsed", StringType, true) :: @@ -727,7 +727,7 @@ class JsonSuite extends QueryTest { StructField("b", StringType, true) :: StructField("c", StringType, true) :: Nil) - assert(schema === jsonSchemaRDD.schema) + assert(schema === jsonDF.schema) // In HiveContext, backticks should be used to access columns starting with a underscore. checkAnswer( @@ -772,8 +772,8 @@ class JsonSuite extends QueryTest { } test("SPARK-4068: nulls in arrays") { - val jsonSchemaRDD = jsonRDD(nullsInArrays) - jsonSchemaRDD.registerTempTable("jsonTable") + val jsonDF = jsonRDD(nullsInArrays) + jsonDF.registerTempTable("jsonTable") val schema = StructType( StructField("field1", @@ -787,7 +787,7 @@ class JsonSuite extends QueryTest { StructField("field4", ArrayType(ArrayType(ArrayType(IntegerType, false), true), false), true) :: Nil) - assert(schema === jsonSchemaRDD.schema) + assert(schema === jsonDF.schema) checkAnswer( sql( @@ -802,7 +802,7 @@ class JsonSuite extends QueryTest { ) } - test("SPARK-4228 SchemaRDD to JSON") + test("SPARK-4228 DataFrame to JSON") { val schema1 = StructType( StructField("f1", IntegerType, false) :: @@ -819,10 +819,10 @@ class JsonSuite extends QueryTest { Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5) } - val schemaRDD1 = applySchema(rowRDD1, schema1) - schemaRDD1.registerTempTable("applySchema1") - val schemaRDD2 = schemaRDD1.toDF - val result = schemaRDD2.toJSON.collect() + val df1 = applySchema(rowRDD1, schema1) + df1.registerTempTable("applySchema1") + val df2 = df1.toDataFrame + val result = df2.toJSON.collect() assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}") assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}") @@ -840,16 +840,16 @@ class JsonSuite extends QueryTest { Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4)) } - val schemaRDD3 = applySchema(rowRDD2, schema2) - schemaRDD3.registerTempTable("applySchema2") - val schemaRDD4 = schemaRDD3.toDF - val result2 = schemaRDD4.toJSON.collect() + val df3 = applySchema(rowRDD2, schema2) + df3.registerTempTable("applySchema2") + val df4 = df3.toDataFrame + val result2 = df4.toJSON.collect() assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}") assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}") - val jsonSchemaRDD = jsonRDD(primitiveFieldAndType) - val primTable = jsonRDD(jsonSchemaRDD.toJSON) + val jsonDF = jsonRDD(primitiveFieldAndType) + val primTable = jsonRDD(jsonDF.toJSON) primTable.registerTempTable("primativeTable") checkAnswer( sql("select * from primativeTable"), @@ -861,8 +861,8 @@ class JsonSuite extends QueryTest { "this is a simple string.") ) - val complexJsonSchemaRDD = jsonRDD(complexFieldAndType1) - val compTable = jsonRDD(complexJsonSchemaRDD.toJSON) + val complexJsonDF = jsonRDD(complexFieldAndType1) + val compTable = jsonRDD(complexJsonDF.toJSON) compTable.registerTempTable("complexTable") // Access elements of a primitive array. checkAnswer( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 8e70ae8f56..822864f8ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -396,7 +396,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN) } - clearCache() + cacheManager.clearCache() loadedTables.clear() catalog.cachedDataSourceTables.invalidateAll() catalog.client.getAllTables("default").foreach { t => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 42bc8a0b67..91af35f096 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -239,7 +239,7 @@ case class InsertIntoHiveTable( } // Invalidate the cache. - sqlContext.invalidateCache(table) + sqlContext.cacheManager.invalidateCache(table) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 91f9da35ab..4814cb7ebf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -54,7 +54,7 @@ case class DropTable( val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" try { - hiveContext.tryUncacheQuery(hiveContext.table(tableName)) + hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName)) } catch { // This table's metadata is not in case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 5775d83fcb..4dd96bd5a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest { // Make sure the table has been updated. checkAnswer( sql("SELECT * FROM createAndInsertTest"), - testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq + testData.toDataFrame.collect().toSeq ++ testData.toDataFrame.collect().toSeq ) // Now overwrite. @@ -82,8 +82,8 @@ class InsertIntoHiveTableSuite extends QueryTest { val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil) val rowRDD = TestHive.sparkContext.parallelize( (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i")))) - val schemaRDD = applySchema(rowRDD, schema) - schemaRDD.registerTempTable("tableWithMapValue") + val df = applySchema(rowRDD, schema) + df.registerTempTable("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m MAP )") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -127,8 +127,8 @@ class InsertIntoHiveTableSuite extends QueryTest { val schema = StructType(Seq( StructField("a", ArrayType(StringType, containsNull = false)))) val rowRDD = TestHive.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) - val schemaRDD = applySchema(rowRDD, schema) - schemaRDD.registerTempTable("tableWithArrayValue") + val df = applySchema(rowRDD, schema) + df.registerTempTable("tableWithArrayValue") sql("CREATE TABLE hiveTableWithArrayValue(a Array )") sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue") @@ -144,8 +144,8 @@ class InsertIntoHiveTableSuite extends QueryTest { StructField("m", MapType(StringType, StringType, valueContainsNull = false)))) val rowRDD = TestHive.sparkContext.parallelize( (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i")))) - val schemaRDD = applySchema(rowRDD, schema) - schemaRDD.registerTempTable("tableWithMapValue") + val df = applySchema(rowRDD, schema) + df.registerTempTable("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m Map )") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -161,8 +161,8 @@ class InsertIntoHiveTableSuite extends QueryTest { StructField("s", StructType(Seq(StructField("f", StringType, nullable = false)))))) val rowRDD = TestHive.sparkContext.parallelize( (1 to 100).map(i => Row(Row(s"value$i")))) - val schemaRDD = applySchema(rowRDD, schema) - schemaRDD.registerTempTable("tableWithStructValue") + val df = applySchema(rowRDD, schema) + df.registerTempTable("tableWithStructValue") sql("CREATE TABLE hiveTableWithStructValue(s Struct )") sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index d67b00bc9d..0c8a113c75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -368,7 +368,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") } - test("SchemaRDD toString") { + test("DataFrame toString") { sql("SHOW TABLES").toString sql("SELECT * FROM src").toString } @@ -479,7 +479,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { explanation.contains("== Physical Plan ==") } - test("SPARK-1704: Explain commands as a SchemaRDD") { + test("SPARK-1704: Explain commands as a DataFrame") { sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") val rdd = sql("explain select key, count(value) from src group by key") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 7f9f1ac7cd..faa7357b90 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -222,7 +222,7 @@ class SQLQuerySuite extends QueryTest { sql("SELECT distinct key FROM src order by key").collect().toSeq) } - test("SPARK-4963 SchemaRDD sample on mutable row return wrong result") { + test("SPARK-4963 DataFrame sample on mutable row return wrong result") { sql("SELECT * FROM src WHERE key % 2 = 0") .sample(withReplacement = false, fraction = 0.3) .registerTempTable("sampled") -- cgit v1.2.3