aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-07-12 17:16:59 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-12 17:16:59 +0800
commit5b28e02584fa4da85214e7da6d77b3b8e189b781 (patch)
treea89c74943d5b39fc6af322ff2b24b21c7ba75660 /sql
parentfc11c509e234c5414687f7fbd13af113a1f52f10 (diff)
downloadspark-5b28e02584fa4da85214e7da6d77b3b8e189b781.tar.gz
spark-5b28e02584fa4da85214e7da6d77b3b8e189b781.tar.bz2
spark-5b28e02584fa4da85214e7da6d77b3b8e189b781.zip
[SPARK-16189][SQL] Add ExternalRDD logical plan for input with RDD to have a chance to eliminate serialize/deserialize.
## What changes were proposed in this pull request? Currently the input `RDD` of `Dataset` is always serialized to `RDD[InternalRow]` prior to being as `Dataset`, but there is a case that we use `map` or `mapPartitions` just after converted to `Dataset`. In this case, serialize and then deserialize happens but it would not be needed. This pr adds `ExistingRDD` logical plan for input with `RDD` to have a chance to eliminate serialize/deserialize. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13890 from ueshin/issues/SPARK-16189.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala8
5 files changed, 81 insertions, 20 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a3fd39d42e..1271d1c55b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -237,10 +237,8 @@ class SparkSession private(
@Experimental
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
SparkSession.setActiveSession(this)
- val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
- val attributeSeq = schema.toAttributes
- val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
- Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRDD)(self))
+ val encoder = Encoders.product[A]
+ Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
}
/**
@@ -425,11 +423,7 @@ class SparkSession private(
*/
@Experimental
def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
- val enc = encoderFor[T]
- val attributes = enc.schema.toAttributes
- val encoded = data.map(d => enc.toRow(d))
- val plan = LogicalRDD(attributes, encoded)(self)
- Dataset[T](self, plan)
+ Dataset[T](self, ExternalRDD(data, self))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 09203e6998..491c2742ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,12 +20,12 @@ package org.apache.spark.sql.execution
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Encoder, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -74,13 +74,71 @@ object RDDConversions {
}
}
+private[sql] object ExternalRDD {
+
+ def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
+ val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
+ CatalystSerde.serialize[T](externalRdd)
+ }
+}
+
/** Logical plan node for scanning data from an RDD. */
+private[sql] case class ExternalRDD[T](
+ outputObjAttr: Attribute,
+ rdd: RDD[T])(session: SparkSession)
+ extends LeafNode with ObjectProducer with MultiInstanceRelation {
+
+ override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
+
+ override def newInstance(): ExternalRDD.this.type =
+ ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type]
+
+ override def sameResult(plan: LogicalPlan): Boolean = {
+ plan.canonicalized match {
+ case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id
+ case _ => false
+ }
+ }
+
+ override protected def stringArgs: Iterator[Any] = Iterator(output)
+
+ @transient override lazy val statistics: Statistics = Statistics(
+ // TODO: Instead of returning a default value here, find a way to return a meaningful size
+ // estimate for RDDs. See PR 1238 for more discussions.
+ sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
+ )
+}
+
+/** Physical plan node for scanning data from an RDD. */
+private[sql] case class ExternalRDDScanExec[T](
+ outputObjAttr: Attribute,
+ rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec {
+
+ private[sql] override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val outputDataType = outputObjAttr.dataType
+ rdd.mapPartitionsInternal { iter =>
+ val outputObject = ObjectOperator.wrapObjectToRow(outputDataType)
+ iter.map { value =>
+ numOutputRows += 1
+ outputObject(value)
+ }
+ }
+ }
+
+ override def simpleString: String = {
+ s"Scan $nodeName${output.mkString("[", ",", "]")}"
+ }
+}
+
+/** Logical plan node for scanning data from an RDD of InternalRow. */
private[sql] case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow])(session: SparkSession)
- extends LogicalPlan with MultiInstanceRelation {
-
- override def children: Seq[LogicalPlan] = Nil
+ extends LeafNode with MultiInstanceRelation {
override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
@@ -96,8 +154,6 @@ private[sql] case class LogicalRDD(
override protected def stringArgs: Iterator[Any] = Iterator(output)
- override def producedAttributes: AttributeSet = outputSet
-
@transient override lazy val statistics: Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
@@ -105,7 +161,7 @@ private[sql] case class LogicalRDD(
)
}
-/** Physical plan node for scanning data from an RDD. */
+/** Physical plan node for scanning data from an RDD of InternalRow. */
private[sql] case class RDDScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index df2f238d8c..f86f42b1f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -34,8 +34,12 @@ private[sql] case class LocalTableScanExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
private val unsafeRows: Array[InternalRow] = {
- val proj = UnsafeProjection.create(output, output)
- rows.map(r => proj(r).copy()).toArray
+ if (rows.isEmpty) {
+ Array.empty
+ } else {
+ val proj = UnsafeProjection.create(output, output)
+ rows.map(r => proj(r).copy()).toArray
+ }
}
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e643ea75a..52e19819f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -411,6 +411,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.RepartitionByExpression(expressions, child, nPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
+ case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil
case BroadcastHint(child) => planLater(child) :: Nil
case _ => Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index ab505139a8..a9d0fcf1b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -242,6 +242,12 @@ abstract class QueryTest extends PlanTest {
case _: LogicalRelation => return
case p if p.getClass.getSimpleName == "MetastoreRelation" => return
case _: MemoryPlan => return
+ case p: InMemoryRelation =>
+ p.child.transform {
+ case _: ObjectConsumerExec => return
+ case _: ObjectProducerExec => return
+ }
+ p
}.transformAllExpressions {
case a: ImperativeAggregate => return
case _: TypedAggregateExpression => return