From 5b28e02584fa4da85214e7da6d77b3b8e189b781 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 12 Jul 2016 17:16:59 +0800 Subject: [SPARK-16189][SQL] Add ExternalRDD logical plan for input with RDD to have a chance to eliminate serialize/deserialize. ## What changes were proposed in this pull request? Currently the input `RDD` of `Dataset` is always serialized to `RDD[InternalRow]` prior to being as `Dataset`, but there is a case that we use `map` or `mapPartitions` just after converted to `Dataset`. In this case, serialize and then deserialize happens but it would not be needed. This pr adds `ExistingRDD` logical plan for input with `RDD` to have a chance to eliminate serialize/deserialize. ## How was this patch tested? Existing tests. Author: Takuya UESHIN Closes #13890 from ueshin/issues/SPARK-16189. --- .../scala/org/apache/spark/sql/SparkSession.scala | 12 +--- .../apache/spark/sql/execution/ExistingRDD.scala | 72 +++++++++++++++++++--- .../spark/sql/execution/LocalTableScanExec.scala | 8 ++- .../spark/sql/execution/SparkStrategies.scala | 1 + .../scala/org/apache/spark/sql/QueryTest.scala | 8 ++- 5 files changed, 81 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a3fd39d42e..1271d1c55b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -237,10 +237,8 @@ class SparkSession private( @Experimental def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { SparkSession.setActiveSession(this) - val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] - val attributeSeq = schema.toAttributes - val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRDD)(self)) + val encoder = Encoders.product[A] + Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) } /** @@ -425,11 +423,7 @@ class SparkSession private( */ @Experimental def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { - val enc = encoderFor[T] - val attributes = enc.schema.toAttributes - val encoded = data.map(d => enc.toRow(d)) - val plan = LogicalRDD(attributes, encoded)(self) - Dataset[T](self, plan) + Dataset[T](self, ExternalRDD(data, self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 09203e6998..491c2742ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.execution import org.apache.commons.lang3.StringUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, Encoder, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} @@ -74,13 +74,71 @@ object RDDConversions { } } +private[sql] object ExternalRDD { + + def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { + val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) + CatalystSerde.serialize[T](externalRdd) + } +} + /** Logical plan node for scanning data from an RDD. */ +private[sql] case class ExternalRDD[T]( + outputObjAttr: Attribute, + rdd: RDD[T])(session: SparkSession) + extends LeafNode with ObjectProducer with MultiInstanceRelation { + + override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil + + override def newInstance(): ExternalRDD.this.type = + ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type] + + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } + } + + override protected def stringArgs: Iterator[Any] = Iterator(output) + + @transient override lazy val statistics: Statistics = Statistics( + // TODO: Instead of returning a default value here, find a way to return a meaningful size + // estimate for RDDs. See PR 1238 for more discussions. + sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) + ) +} + +/** Physical plan node for scanning data from an RDD. */ +private[sql] case class ExternalRDDScanExec[T]( + outputObjAttr: Attribute, + rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val outputDataType = outputObjAttr.dataType + rdd.mapPartitionsInternal { iter => + val outputObject = ObjectOperator.wrapObjectToRow(outputDataType) + iter.map { value => + numOutputRows += 1 + outputObject(value) + } + } + } + + override def simpleString: String = { + s"Scan $nodeName${output.mkString("[", ",", "]")}" + } +} + +/** Logical plan node for scanning data from an RDD of InternalRow. */ private[sql] case class LogicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow])(session: SparkSession) - extends LogicalPlan with MultiInstanceRelation { - - override def children: Seq[LogicalPlan] = Nil + extends LeafNode with MultiInstanceRelation { override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil @@ -96,8 +154,6 @@ private[sql] case class LogicalRDD( override protected def stringArgs: Iterator[Any] = Iterator(output) - override def producedAttributes: AttributeSet = outputSet - @transient override lazy val statistics: Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. @@ -105,7 +161,7 @@ private[sql] case class LogicalRDD( ) } -/** Physical plan node for scanning data from an RDD. */ +/** Physical plan node for scanning data from an RDD of InternalRow. */ private[sql] case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index df2f238d8c..f86f42b1f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -34,8 +34,12 @@ private[sql] case class LocalTableScanExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) private val unsafeRows: Array[InternalRow] = { - val proj = UnsafeProjection.create(output, output) - rows.map(r => proj(r).copy()).toArray + if (rows.isEmpty) { + Array.empty + } else { + val proj = UnsafeProjection.create(output, output) + rows.map(r => proj(r).copy()).toArray + } } private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5e643ea75a..52e19819f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -411,6 +411,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil + case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => planLater(child) :: Nil case _ => Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index ab505139a8..a9d0fcf1b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -242,6 +242,12 @@ abstract class QueryTest extends PlanTest { case _: LogicalRelation => return case p if p.getClass.getSimpleName == "MetastoreRelation" => return case _: MemoryPlan => return + case p: InMemoryRelation => + p.child.transform { + case _: ObjectConsumerExec => return + case _: ObjectProducerExec => return + } + p }.transformAllExpressions { case a: ImperativeAggregate => return case _: TypedAggregateExpression => return -- cgit v1.2.3