aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala22
1 files changed, 12 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 14b8b6fc3b..124ec09efd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.Dataset
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -74,12 +75,12 @@ private[sql] class CacheManager extends Logging {
}
/**
- * Caches the data produced by the logical representation of the given [[Queryable]].
+ * Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
private[sql] def cacheQuery(
- query: Queryable,
+ query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
@@ -99,8 +100,8 @@ private[sql] class CacheManager extends Logging {
}
}
- /** Removes the data for the given [[Queryable]] from the cache */
- private[sql] def uncacheQuery(query: Queryable, blocking: Boolean = true): Unit = writeLock {
+ /** Removes the data for the given [[Dataset]] from the cache */
+ private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
require(dataIndex >= 0, s"Table $query is not cached.")
@@ -108,11 +109,12 @@ private[sql] class CacheManager extends Logging {
cachedData.remove(dataIndex)
}
- /** Tries to remove the data for the given [[Queryable]] from the cache
- * if it's cached
- */
+ /**
+ * Tries to remove the data for the given [[Dataset]] from the cache
+ * if it's cached
+ */
private[sql] def tryUncacheQuery(
- query: Queryable,
+ query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
@@ -124,8 +126,8 @@ private[sql] class CacheManager extends Logging {
found
}
- /** Optionally returns cached data for the given [[Queryable]] */
- private[sql] def lookupCachedData(query: Queryable): Option[CachedData] = readLock {
+ /** Optionally returns cached data for the given [[Dataset]] */
+ private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
lookupCachedData(query.queryExecution.analyzed)
}