aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-04-18 10:02:27 -0700
committerReynold Xin <rxin@apache.org>2014-04-18 10:02:27 -0700
commit89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3 (patch)
tree97fbc68117ec09ac781ac4bfc2fbe3eda83253e3 /sql
parente31c8ffca65e0e5cd5f1a6229f3d654a24b7b18c (diff)
downloadspark-89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3.tar.gz
spark-89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3.tar.bz2
spark-89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3.zip
Reuses Row object in ExistingRdd.productToRowRdd()
Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #432 from liancheng/reuseRow and squashes the following commits: 9e6d083 [Cheng Lian] Simplified code with BufferedIterator 52acec9 [Cheng Lian] Reuses Row object in ExistingRdd.productToRowRdd()
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala21
1 files changed, 18 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index ab2e624637..eedcc7dda0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
-
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)
@@ -143,8 +142,24 @@ object ExistingRdd {
}
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
- // TODO: Reuse the row, don't use map on the product iterator. Maybe code gen?
- data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
+ data.mapPartitions { iterator =>
+ if (iterator.isEmpty) {
+ Iterator.empty
+ } else {
+ val bufferedIterator = iterator.buffered
+ val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
+
+ bufferedIterator.map { r =>
+ var i = 0
+ while (i < mutableRow.length) {
+ mutableRow(i) = r.productElement(i)
+ i += 1
+ }
+
+ mutableRow
+ }
+ }
+ }
}
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {