diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 14b8b6fc3b..124ec09efd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.Dataset import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -74,12 +75,12 @@ private[sql] class CacheManager extends Logging { } /** - * Caches the data produced by the logical representation of the given [[Queryable]]. + * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because * recomputing the in-memory columnar representation of the underlying table is expensive. */ private[sql] def cacheQuery( - query: Queryable, + query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.queryExecution.analyzed @@ -99,8 +100,8 @@ private[sql] class CacheManager extends Logging { } } - /** Removes the data for the given [[Queryable]] from the cache */ - private[sql] def uncacheQuery(query: Queryable, blocking: Boolean = true): Unit = writeLock { + /** Removes the data for the given [[Dataset]] from the cache */ + private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") @@ -108,11 +109,12 @@ private[sql] class CacheManager extends Logging { cachedData.remove(dataIndex) } - /** Tries to remove the data for the given [[Queryable]] from the cache - * if it's cached - */ + /** + * Tries to remove the data for the given [[Dataset]] from the cache + * if it's cached + */ private[sql] def tryUncacheQuery( - query: Queryable, + query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) @@ -124,8 +126,8 @@ private[sql] class CacheManager extends Logging { found } - /** Optionally returns cached data for the given [[Queryable]] */ - private[sql] def lookupCachedData(query: Queryable): Option[CachedData] = readLock { + /** Optionally returns cached data for the given [[Dataset]] */ + private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { lookupCachedData(query.queryExecution.analyzed) } |