diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-04-18 20:28:58 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-18 20:28:58 -0700 |
commit | 6f8800689530591c8db856ade1f771fb30a12219 (patch) | |
tree | 0baaef587623e59b9a552846beafc7b6dd160fae /sql | |
parent | 4eae1dbd7c4ec96727f92dd52e3fb9b26b0ec883 (diff) | |
download | spark-6f8800689530591c8db856ade1f771fb30a12219.tar.gz spark-6f8800689530591c8db856ade1f771fb30a12219.tar.bz2 spark-6f8800689530591c8db856ade1f771fb30a12219.zip |
[SPARK-14722][SQL] Rename upstreams() -> inputRDDs() in WholeStageCodegen
## What changes were proposed in this pull request?
Per rxin's suggestions, this patch renames `upstreams()` to `inputRDDs()` in `WholeStageCodegen` for better implied semantics
## How was this patch tested?
N/A
Author: Sameer Agarwal <sameer@databricks.com>
Closes #12486 from sameeragarwal/codegen-cleanup.
Diffstat (limited to 'sql')
11 files changed, 37 insertions, 37 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 3dc2aa33df..1dc1b51e94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -177,7 +177,7 @@ private[sql] case class RowDataSourceScan( s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { rdd :: Nil } @@ -228,7 +228,7 @@ private[sql] case class BatchedDataSourceScan( s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" } - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { rdd :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index cc0382e5d4..3966af542e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -85,8 +85,8 @@ case class Expand( } } - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala index efd8760cd2..80255fafbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -100,8 +100,8 @@ case class Sort( override def usedInputs: AttributeSet = AttributeSet(Seq.empty) - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } // Name of sorter variable used in codegen. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 12d08c8c45..29b66e3dee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -74,10 +74,10 @@ trait CodegenSupport extends SparkPlan { * * Note: right now we support up to two RDDs. */ - def upstreams(): Seq[RDD[InternalRow]] + def inputRDDs(): Seq[RDD[InternalRow]] /** - * Returns Java source code to process the rows from upstream. + * Returns Java source code to process the rows from input RDD. */ final def produce(ctx: CodegenContext, parent: CodegenSupport): String = { this.parent = parent @@ -234,13 +234,13 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport child.doExecuteBroadcast() } - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.execute() :: Nil } override def doProduce(ctx: CodegenContext): String = { val input = ctx.freshName("input") - // Right now, InputAdapter is only used when there is one upstream. + // Right now, InputAdapter is only used when there is one input RDD. ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") val row = ctx.freshName("row") s""" @@ -272,7 +272,7 @@ object WholeStageCodegen { * * -> execute() * | - * doExecute() ---------> upstreams() -------> upstreams() ------> execute() + * doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute() * | * +-----------------> produce() * | @@ -350,8 +350,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup val durationMs = longMetric("pipelineTime") - val rdds = child.asInstanceOf[CodegenSupport].upstreams() - assert(rdds.size <= 2, "Up to two upstream RDDs can be supported") + val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() + assert(rdds.size <= 2, "Up to two input RDDs can be supported") if (rdds.length == 1) { rdds.head.mapPartitionsWithIndex { (index, iter) => val clazz = CodeGenerator.compile(cleanedSource) @@ -367,7 +367,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup } } } else { - // Right now, we support up to two upstreams. + // Right now, we support up to two input RDDs. rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) => val partitionIndex = TaskContext.getPartitionId() val clazz = CodeGenerator.compile(cleanedSource) @@ -385,7 +385,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup } } - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { throw new UnsupportedOperationException } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index d819a65993..89977f9e08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -129,8 +129,8 @@ case class TungstenAggregate( !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate]) } - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index c689fc3fbb..892c57ae7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -31,8 +31,8 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) override def output: Seq[Attribute] = projectList.map(_.toAttribute) - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { @@ -103,8 +103,8 @@ case class Filter(condition: Expression, child: SparkPlan) private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { @@ -243,8 +243,8 @@ case class Sample( } } - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { @@ -315,7 +315,7 @@ case class Range( // output attributes should not affect the results override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements) - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { sqlContext.sparkContext.parallelize(0 until numSlices, numSlices) .map(i => InternalRow(i)) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 17eae88b49..e6079ecaad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -164,8 +164,8 @@ package object debug { } } - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } override def doProduce(ctx: CodegenContext): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index b94b0d26b2..89487c6b87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -71,8 +71,8 @@ case class BroadcastHashJoin( } } - override def upstreams(): Seq[RDD[InternalRow]] = { - streamedPlan.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + streamedPlan.asInstanceOf[CodegenSupport].inputRDDs() } override def doProduce(ctx: CodegenContext): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 443a7b43b6..4e45fd6560 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -216,7 +216,7 @@ case class SortMergeJoin( joinType == Inner } - override def upstreams(): Seq[RDD[InternalRow]] = { + override def inputRDDs(): Seq[RDD[InternalRow]] = { left.execute() :: right.execute() :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 9643b52f96..c9a14593fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -57,8 +57,8 @@ trait BaseLimit extends UnaryNode with CodegenSupport { iter.take(limit) } - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 784b1e8c26..e7261fc512 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -36,8 +36,8 @@ case class DeserializeToObject( child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = deserializer.toAttribute :: Nil - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { @@ -69,8 +69,8 @@ case class SerializeFromObject( child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = serializer.map(_.toAttribute) - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { @@ -153,8 +153,8 @@ case class MapElements( child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport { override def output: Seq[Attribute] = serializer.map(_.toAttribute) - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() } protected override def doProduce(ctx: CodegenContext): String = { |