aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-20 13:12:24 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-20 13:12:24 -0800
commitabf29187f0342b607fcefe269391d4db58d2a957 (patch)
tree783cca4188cc5627e96e0d394901eff0e8ff5c08 /sql
parent15cacc81240eed8834b4730c5c6dc3238f003465 (diff)
downloadspark-abf29187f0342b607fcefe269391d4db58d2a957.tar.gz
spark-abf29187f0342b607fcefe269391d4db58d2a957.tar.bz2
spark-abf29187f0342b607fcefe269391d4db58d2a957.zip
[SPARK-3938][SQL] Names in-memory columnar RDD with corresponding table name
This PR enables the Web UI storage tab to show the in-memory table name instead of the mysterious query plan string as the name of the in-memory columnar RDD. Note that after #2501, a single columnar RDD can be shared by multiple in-memory tables, as long as their query results are the same. In this case, only the first cached table name is shown. For example: ```sql CACHE TABLE first AS SELECT * FROM src; CACHE TABLE second AS SELECT * FROM src; ``` The Web UI only shows "In-memory table first". <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3383) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3383 from liancheng/columnar-rdd-name and squashes the following commits: 071907f [Cheng Lian] Fixes tests 12ddfa6 [Cheng Lian] Names in-memory columnar RDD with corresponding table name
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala6
6 files changed, 23 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 2e7abac1f1..3c9439b2e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -46,7 +46,7 @@ private[sql] trait CacheManager {
def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty
/** Caches the specified table in-memory. */
- def cacheTable(tableName: String): Unit = cacheQuery(table(tableName))
+ def cacheTable(tableName: String): Unit = cacheQuery(table(tableName), Some(tableName))
/** Removes the specified table from the in-memory cache. */
def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName))
@@ -81,6 +81,7 @@ private[sql] trait CacheManager {
*/
private[sql] def cacheQuery(
query: SchemaRDD,
+ tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) {
@@ -90,7 +91,11 @@ private[sql] trait CacheManager {
CachedData(
planToCache,
InMemoryRelation(
- useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan))
+ useCompression,
+ columnBatchSize,
+ storageLevel,
+ query.queryExecution.executedPlan,
+ tableName))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index fbec2f9f4b..904a276ef3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -475,7 +475,7 @@ class SchemaRDD(
}
override def persist(newLevel: StorageLevel): this.type = {
- sqlContext.cacheQuery(this, newLevel)
+ sqlContext.cacheQuery(this, None, newLevel)
this
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 881d32b105..0cebe823b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -36,8 +36,9 @@ private[sql] object InMemoryRelation {
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
- child: SparkPlan): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
+ child: SparkPlan,
+ tableName: Option[String]): InMemoryRelation =
+ new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
}
private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
@@ -47,7 +48,8 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
- child: SparkPlan)(
+ child: SparkPlan,
+ tableName: Option[String])(
private var _cachedColumnBuffers: RDD[CachedBatch] = null,
private var _statistics: Statistics = null)
extends LogicalPlan with MultiInstanceRelation {
@@ -137,13 +139,13 @@ private[sql] case class InMemoryRelation(
}
}.persist(storageLevel)
- cached.setName(child.toString)
+ cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString))
_cachedColumnBuffers = cached
}
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation(
- newOutput, useCompression, batchSize, storageLevel, child)(
+ newOutput, useCompression, batchSize, storageLevel, child, tableName)(
_cachedColumnBuffers, statisticsToBePropagated)
}
@@ -155,7 +157,8 @@ private[sql] case class InMemoryRelation(
useCompression,
batchSize,
storageLevel,
- child)(
+ child,
+ tableName)(
_cachedColumnBuffers,
statisticsToBePropagated).asInstanceOf[this.type]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f23b9c48cf..afe3f3f074 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -160,12 +160,11 @@ case class CacheTableCommand(
import sqlContext._
plan.foreach(_.registerTempTable(tableName))
- val schemaRDD = table(tableName)
- schemaRDD.cache()
+ cacheTable(tableName)
if (!isLazy) {
// Performs eager caching
- schemaRDD.count()
+ table(tableName).count()
}
Seq.empty[Row]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 765fa82776..042210176a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -123,7 +123,7 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData")
assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
table("testData").queryExecution.withCachedData.collect {
- case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
+ case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
}.size
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 15903d07df..fc95dccc74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -29,7 +29,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") {
val plan = executePlan(testData.logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
}
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("projection") {
val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
@@ -53,7 +53,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = executePlan(testData.logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)