aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-07-12 12:13:32 -0700
committerReynold Xin <rxin@apache.org>2014-07-12 12:13:32 -0700
commit1a7d7cc85fb24de21f1cde67d04467171b82e845 (patch)
treeeb2fd51ba89ab26ed21c4561bddbdfb0fe456ec0 /sql
parent7e26b57615f6c1d3f9058f9c19c05ec91f017f4c (diff)
downloadspark-1a7d7cc85fb24de21f1cde67d04467171b82e845.tar.gz
spark-1a7d7cc85fb24de21f1cde67d04467171b82e845.tar.bz2
spark-1a7d7cc85fb24de21f1cde67d04467171b82e845.zip
[SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation
Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan. Author: Michael Armbrust <michael@databricks.com> Closes #1332 from marmbrus/doubleCache and squashes the following commits: 4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor. b39c931 [Michael Armbrust] Allocations are kind of a side effect. f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala35
2 files changed, 25 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index a6ce90854d..22941edef2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* of itself with globally unique expression ids.
*/
trait MultiInstanceRelation {
- def newInstance: this.type
+ def newInstance(): this.type
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index e1e4f24c6c..ff7f664d8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.columnar
+import java.nio.ByteBuffer
+
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,22 +29,19 @@ import org.apache.spark.SparkConf
object InMemoryRelation {
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, child)
+ new InMemoryRelation(child.output, useCompression, child)()
}
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
child: SparkPlan)
+ (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
- override def children = Seq.empty
- override def references = Set.empty
-
- override def newInstance() =
- new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
-
- lazy val cachedColumnBuffers = {
+ // If the cached column buffers were not passed in, we calculate them in the constructor.
+ // As in Spark, the actual work of caching is lazy.
+ if (_cachedColumnBuffers == null) {
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
@@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation(
}.cache()
cached.setName(child.toString)
- // Force the materialization of the cached RDD.
- cached.count()
- cached
+ _cachedColumnBuffers = cached
}
+
+
+ override def children = Seq.empty
+
+ override def references = Set.empty
+
+ override def newInstance() = {
+ new InMemoryRelation(
+ output.map(_.newInstance),
+ useCompression,
+ child)(
+ _cachedColumnBuffers).asInstanceOf[this.type]
+ }
+
+ def cachedColumnBuffers = _cachedColumnBuffers
}
private[sql] case class InMemoryColumnarTableScan(