diff options
author | Cheng Lian <lian@databricks.com> | 2014-11-20 13:12:24 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-20 13:12:24 -0800 |
commit | abf29187f0342b607fcefe269391d4db58d2a957 (patch) | |
tree | 783cca4188cc5627e96e0d394901eff0e8ff5c08 /sql/core/src | |
parent | 15cacc81240eed8834b4730c5c6dc3238f003465 (diff) | |
download | spark-abf29187f0342b607fcefe269391d4db58d2a957.tar.gz spark-abf29187f0342b607fcefe269391d4db58d2a957.tar.bz2 spark-abf29187f0342b607fcefe269391d4db58d2a957.zip |
[SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name
This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD.
Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example:
```sql
CACHE TABLE first AS SELECT * FROM src;
CACHE TABLE second AS SELECT * FROM src;
```
The Web UI only shows "In-memory table first".
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits:
071907f [Cheng Lian] Fixes tests
12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name
Diffstat (limited to 'sql/core/src')
6 files changed, 23 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 2e7abac1f1..3c9439b2e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -46,7 +46,7 @@ private[sql] trait CacheManager { def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty /** Caches the specified table in-memory. */ - def cacheTable(tableName: String): Unit = cacheQuery(table(tableName)) + def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName)) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName)) @@ -81,6 +81,7 @@ private[sql] trait CacheManager { */ private[sql] def cacheQuery( query: SchemaRDD, + tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.queryExecution.analyzed if (lookupCachedData(planToCache).nonEmpty) { @@ -90,7 +91,11 @@ private[sql] trait CacheManager { CachedData( planToCache, InMemoryRelation( - useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan)) + useCompression, + columnBatchSize, + storageLevel, + query.queryExecution.executedPlan, + tableName)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index fbec2f9f4b..904a276ef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -475,7 +475,7 @@ class SchemaRDD( } override def persist(newLevel: StorageLevel): this.type = { - sqlContext.cacheQuery(this, newLevel) + sqlContext.cacheQuery(this, None, newLevel) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 881d32b105..0cebe823b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -36,8 +36,9 @@ private[sql] object InMemoryRelation { useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, - child: SparkPlan): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)() + child: SparkPlan, + tableName: Option[String]): InMemoryRelation = + new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() } private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) @@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, - child: SparkPlan)( + child: SparkPlan, + tableName: Option[String])( private var _cachedColumnBuffers: RDD[CachedBatch] = null, private var _statistics: Statistics = null) extends LogicalPlan with MultiInstanceRelation { @@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation( } }.persist(storageLevel) - cached.setName(child.toString) + cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) _cachedColumnBuffers = cached } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { InMemoryRelation( - newOutput, useCompression, batchSize, storageLevel, child)( + newOutput, useCompression, batchSize, storageLevel, child, tableName)( _cachedColumnBuffers, statisticsToBePropagated) } @@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation( useCompression, batchSize, storageLevel, - child)( + child, + tableName)( _cachedColumnBuffers, statisticsToBePropagated).asInstanceOf[this.type] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index f23b9c48cf..afe3f3f074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -160,12 +160,11 @@ case class CacheTableCommand( import sqlContext._ plan.foreach(_.registerTempTable(tableName)) - val schemaRDD = table(tableName) - schemaRDD.cache() + cacheTable(tableName) if (!isLazy) { // Performs eager caching - schemaRDD.count() + table(tableName).count() } Seq.empty[Row] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 765fa82776..042210176a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest { cacheTable("testData") assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { table("testData").queryExecution.withCachedData.collect { - case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r + case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r }.size } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 15903d07df..fc95dccc74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("simple columnar query") { val plan = executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().toSeq) } @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("projection") { val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) |