aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorNick Pentreath <nickp@za.ibm.com>2016-10-14 15:07:32 -0700
committerMichael Armbrust <michael@databricks.com>2016-10-14 15:09:49 -0700
commit5aeb7384c7aa5f487f031f9ae07d3f1653399d14 (patch)
tree22c2fb40cc786812883239ba32cd8b7097eda57c /sql/core/src/test
parentda9aeb0fde589f7c21c2f4a32036a68c0353965d (diff)
downloadspark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.tar.gz
spark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.tar.bz2
spark-5aeb7384c7aa5f487f031f9ae07d3f1653399d14.zip
[SPARK-16063][SQL] Add storageLevel to Dataset
[SPARK-11905](https://issues.apache.org/jira/browse/SPARK-11905) added support for `persist`/`cache` for `Dataset`. However, there is no user-facing API to check if a `Dataset` is cached and if so what the storage level is. This PR adds `getStorageLevel` to `Dataset`, analogous to `RDD.getStorageLevel`. Updated `DatasetCacheSuite`. Author: Nick Pentreath <nickp@za.ibm.com> Closes #13780 from MLnick/ds-storagelevel. Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala36
1 files changed, 26 insertions, 10 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 8d5e9645df..e0561ee279 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -19,11 +19,32 @@ package org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.storage.StorageLevel
class DatasetCacheSuite extends QueryTest with SharedSQLContext {
import testImplicits._
+ test("get storage level") {
+ val ds1 = Seq("1", "2").toDS().as("a")
+ val ds2 = Seq(2, 3).toDS().as("b")
+
+ // default storage level
+ ds1.persist()
+ ds2.cache()
+ assert(ds1.storageLevel == StorageLevel.MEMORY_AND_DISK)
+ assert(ds2.storageLevel == StorageLevel.MEMORY_AND_DISK)
+ // unpersist
+ ds1.unpersist()
+ assert(ds1.storageLevel == StorageLevel.NONE)
+ // non-default storage level
+ ds1.persist(StorageLevel.MEMORY_ONLY_2)
+ assert(ds1.storageLevel == StorageLevel.MEMORY_ONLY_2)
+ // joined Dataset should not be persisted
+ val joined = ds1.joinWith(ds2, $"a.value" === $"b.value")
+ assert(joined.storageLevel == StorageLevel.NONE)
+ }
+
test("persist and unpersist") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS().select(expr("_2 + 1").as[Int])
val cached = ds.cache()
@@ -37,8 +58,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
2, 3, 4)
// Drop the cache.
cached.unpersist()
- assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty,
- "The Dataset should not be cached.")
+ assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.")
}
test("persist and then rebind right encoder when join 2 datasets") {
@@ -55,11 +75,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(joined, 2)
ds1.unpersist()
- assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty,
- "The Dataset ds1 should not be cached.")
+ assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.")
ds2.unpersist()
- assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty,
- "The Dataset ds2 should not be cached.")
+ assert(ds2.storageLevel == StorageLevel.NONE, "The Dataset ds2 should not be cached.")
}
test("persist and then groupBy columns asKey, map") {
@@ -74,10 +92,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(agged.filter(_._1 == "b"))
ds.unpersist()
- assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty,
- "The Dataset ds should not be cached.")
+ assert(ds.storageLevel == StorageLevel.NONE, "The Dataset ds should not be cached.")
agged.unpersist()
- assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty,
- "The Dataset agged should not be cached.")
+ assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.")
}
}