aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala35
2 files changed, 25 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index a6ce90854d..22941edef2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* of itself with globally unique expression ids.
*/
trait MultiInstanceRelation {
- def newInstance: this.type
+ def newInstance(): this.type
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index e1e4f24c6c..ff7f664d8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.columnar
+import java.nio.ByteBuffer
+
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,22 +29,19 @@ import org.apache.spark.SparkConf
object InMemoryRelation {
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, child)
+ new InMemoryRelation(child.output, useCompression, child)()
}
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
child: SparkPlan)
+ (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
- override def children = Seq.empty
- override def references = Set.empty
-
- override def newInstance() =
- new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
-
- lazy val cachedColumnBuffers = {
+ // If the cached column buffers were not passed in, we calculate them in the constructor.
+ // As in Spark, the actual work of caching is lazy.
+ if (_cachedColumnBuffers == null) {
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
@@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation(
}.cache()
cached.setName(child.toString)
- // Force the materialization of the cached RDD.
- cached.count()
- cached
+ _cachedColumnBuffers = cached
}
+
+
+ override def children = Seq.empty
+
+ override def references = Set.empty
+
+ override def newInstance() = {
+ new InMemoryRelation(
+ output.map(_.newInstance),
+ useCompression,
+ child)(
+ _cachedColumnBuffers).asInstanceOf[this.type]
+ }
+
+ def cachedColumnBuffers = _cachedColumnBuffers
}
private[sql] case class InMemoryColumnarTableScan(