aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-09 18:26:43 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-09 18:26:43 -0700
commit421382d0e728940caa3e61bc11237c61f256378a (patch)
tree28870630006cb962b197bf471a38151a71f5d50b /sql
parentedf02da389f75df5a42465d41f035d6b65599848 (diff)
downloadspark-421382d0e728940caa3e61bc11237c61f256378a.tar.gz
spark-421382d0e728940caa3e61bc11237c61f256378a.tar.bz2
spark-421382d0e728940caa3e61bc11237c61f256378a.zip
[SPARK-3824][SQL] Sets in-memory table default storage level to MEMORY_AND_DISK
Using `MEMORY_AND_DISK` as default storage level for in-memory table caching. Due to the in-memory columnar representation, recomputing an in-memory cached table partitions can be very expensive. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2686 from liancheng/spark-3824 and squashes the following commits: 35d2ed0 [Cheng Lian] Removes extra space 1ab7967 [Cheng Lian] Reduces test data size to fit DiskStore.getBytes() ba565f0 [Cheng Lian] Maks CachedBatch serializable 07f0204 [Cheng Lian] Sets in-memory table default storage level to MEMORY_AND_DISK
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala10
3 files changed, 17 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3bf7382ac6..5ab2b5316a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
/** Holds a cached logical plan and its data */
private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
@@ -74,10 +74,14 @@ private[sql] trait CacheManager {
cachedData.clear()
}
- /** Caches the data produced by the logical representation of the given schema rdd. */
+ /**
+ * Caches the data produced by the logical representation of the given schema rdd. Unlike
+ * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+ * the in-memory columnar representation of the underlying table is expensive.
+ */
private[sql] def cacheQuery(
query: SchemaRDD,
- storageLevel: StorageLevel = MEMORY_ONLY): Unit = writeLock {
+ storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.optimizedPlan
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 4f79173a26..22ab0e2613 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -38,7 +38,7 @@ private[sql] object InMemoryRelation {
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
}
-private[sql] case class CachedBatch(buffers: Array[ByteBuffer], stats: Row)
+private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
@@ -91,7 +91,7 @@ private[sql] case class InMemoryRelation(
val stats = Row.fromSeq(
columnBuilders.map(_.columnStats.collectedStatistics).foldLeft(Seq.empty[Any])(_ ++ _))
- CachedBatch(columnBuilders.map(_.build()), stats)
+ CachedBatch(columnBuilders.map(_.build().array()), stats)
}
def hasNext = rowIterator.hasNext
@@ -238,8 +238,9 @@ private[sql] case class InMemoryColumnarTableScan(
def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]) = {
val rows = cacheBatches.flatMap { cachedBatch =>
// Build column accessors
- val columnAccessors =
- requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
+ val columnAccessors = requestedColumnIndices.map { batch =>
+ ColumnAccessor(ByteBuffer.wrap(cachedBatch.buffers(batch)))
+ }
// Extract rows via column accessors
new Iterator[Row] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index c87ded81fd..444bc95009 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.test.TestSQLContext._
-import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.{StorageLevel, RDDBlockId}
case class BigData(s: String)
@@ -55,10 +55,10 @@ class CachedTableSuite extends QueryTest {
test("too big for memory") {
val data = "*" * 10000
- sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
- cacheTable("bigData")
- assert(table("bigData").count() === 1000000L)
- uncacheTable("bigData")
+ sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData")
+ table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
+ assert(table("bigData").count() === 200000L)
+ table("bigData").unpersist()
}
test("calling .cache() should use in-memory columnar caching") {