From 1a7d7cc85fb24de21f1cde67d04467171b82e845 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 12 Jul 2014 12:13:32 -0700 Subject: [SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan. Author: Michael Armbrust Closes #1332 from marmbrus/doubleCache and squashes the following commits: 4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor. b39c931 [Michael Armbrust] Allocations are kind of a side effect. f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation --- .../catalyst/analysis/MultiInstanceRelation.scala | 2 +- .../sql/columnar/InMemoryColumnarTableScan.scala | 35 +++++++++++++++------- 2 files changed, 25 insertions(+), 12 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index a6ce90854d..22941edef2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * of itself with globally unique expression ids. */ trait MultiInstanceRelation { - def newInstance: this.type + def newInstance(): this.type } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index e1e4f24c6c..ff7f664d8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.columnar +import java.nio.ByteBuffer + +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -26,22 +29,19 @@ import org.apache.spark.SparkConf object InMemoryRelation { def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, child) + new InMemoryRelation(child.output, useCompression, child)() } private[sql] case class InMemoryRelation( output: Seq[Attribute], useCompression: Boolean, child: SparkPlan) + (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null) extends LogicalPlan with MultiInstanceRelation { - override def children = Seq.empty - override def references = Set.empty - - override def newInstance() = - new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type] - - lazy val cachedColumnBuffers = { + // If the cached column buffers were not passed in, we calculate them in the constructor. + // As in Spark, the actual work of caching is lazy. + if (_cachedColumnBuffers == null) { val output = child.output val cached = child.execute().mapPartitions { iterator => val columnBuilders = output.map { attribute => @@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation( }.cache() cached.setName(child.toString) - // Force the materialization of the cached RDD. - cached.count() - cached + _cachedColumnBuffers = cached } + + + override def children = Seq.empty + + override def references = Set.empty + + override def newInstance() = { + new InMemoryRelation( + output.map(_.newInstance), + useCompression, + child)( + _cachedColumnBuffers).asInstanceOf[this.type] + } + + def cachedColumnBuffers = _cachedColumnBuffers } private[sql] case class InMemoryColumnarTableScan( -- cgit v1.2.3