diff options
author | Nick Pentreath <nickp@za.ibm.com> | 2016-10-14 15:07:32 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-10-14 15:09:49 -0700 |
commit | 5aeb7384c7aa5f487f031f9ae07d3f1653399d14 (patch) | |
tree | 22c2fb40cc786812883239ba32cd8b7097eda57c /sql/core | |
parent | da9aeb0fde589f7c21c2f4a32036a68c0353965d (diff) | |
download | spark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.tar.gz spark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.tar.bz2 spark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.zip |
[SPARK-16063][SQL] Add storageLevel to Dataset
[SPARK-11905](https://issues.apache.org/jira/browse/SPARK-11905) added support for `persist`/`cache` for `Dataset`. However, there is no user-facing API to check if a `Dataset` is cached and if so what the storage level is. This PR adds `getStorageLevel` to `Dataset`, analogous to `RDD.getStorageLevel`.
Updated `DatasetCacheSuite`.
Author: Nick Pentreath <nickp@za.ibm.com>
Closes #13780 from MLnick/ds-storagelevel.
Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 12 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala | 36 |
2 files changed, 38 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e59a483075..70c9cf5ae2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2402,6 +2402,18 @@ class Dataset[T] private[sql]( } /** + * Get the Dataset's current storage level, or StorageLevel.NONE if not persisted. + * + * @group basic + * @since 2.1.0 + */ + def storageLevel: StorageLevel = { + sparkSession.sharedState.cacheManager.lookupCachedData(this).map { cachedData => + cachedData.cachedRepresentation.storageLevel + }.getOrElse(StorageLevel.NONE) + } + + /** * Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 8d5e9645df..e0561ee279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -19,11 +19,32 @@ package org.apache.spark.sql import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.storage.StorageLevel class DatasetCacheSuite extends QueryTest with SharedSQLContext { import testImplicits._ + test("get storage level") { + val ds1 = Seq("1", "2").toDS().as("a") + val ds2 = Seq(2, 3).toDS().as("b") + + // default storage level + ds1.persist() + ds2.cache() + assert(ds1.storageLevel == StorageLevel.MEMORY_AND_DISK) + assert(ds2.storageLevel == StorageLevel.MEMORY_AND_DISK) + // unpersist + ds1.unpersist() + assert(ds1.storageLevel == StorageLevel.NONE) + // non-default storage level + ds1.persist(StorageLevel.MEMORY_ONLY_2) + assert(ds1.storageLevel == StorageLevel.MEMORY_ONLY_2) + // joined Dataset should not be persisted + val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") + assert(joined.storageLevel == StorageLevel.NONE) + } + test("persist and unpersist") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS().select(expr("_2 + 1").as[Int]) val cached = ds.cache() @@ -37,8 +58,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { 2, 3, 4) // Drop the cache. cached.unpersist() - assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty, - "The Dataset should not be cached.") + assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.") } test("persist and then rebind right encoder when join 2 datasets") { @@ -55,11 +75,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { assertCached(joined, 2) ds1.unpersist() - assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty, - "The Dataset ds1 should not be cached.") + assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.") ds2.unpersist() - assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty, - "The Dataset ds2 should not be cached.") + assert(ds2.storageLevel == StorageLevel.NONE, "The Dataset ds2 should not be cached.") } test("persist and then groupBy columns asKey, map") { @@ -74,10 +92,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { assertCached(agged.filter(_._1 == "b")) ds.unpersist() - assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty, - "The Dataset ds should not be cached.") + assert(ds.storageLevel == StorageLevel.NONE, "The Dataset ds should not be cached.") agged.unpersist() - assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty, - "The Dataset agged should not be cached.") + assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.") } } |