aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-18 20:28:58 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 20:28:58 -0700
commit6f8800689530591c8db856ade1f771fb30a12219 (patch)
tree0baaef587623e59b9a552846beafc7b6dd160fae /sql/core/src/main/scala/org/apache
parent4eae1dbd7c4ec96727f92dd52e3fb9b26b0ec883 (diff)
downloadspark-6f8800689530591c8db856ade1f771fb30a12219.tar.gz
spark-6f8800689530591c8db856ade1f771fb30a12219.tar.bz2
spark-6f8800689530591c8db856ade1f771fb30a12219.zip
[SPARK-14722][SQL] Rename upstreams() -> inputRDDs() in WholeStageCodegen
## What changes were proposed in this pull request? Per rxin's suggestions, this patch renames `upstreams()` to `inputRDDs()` in `WholeStageCodegen` for better implied semantics ## How was this patch tested? N/A Author: Sameer Agarwal <sameer@databricks.com> Closes #12486 from sameeragarwal/codegen-cleanup.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala12
11 files changed, 37 insertions, 37 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3dc2aa33df..1dc1b51e94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -177,7 +177,7 @@ private[sql] case class RowDataSourceScan(
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
@@ -228,7 +228,7 @@ private[sql] case class BatchedDataSourceScan(
s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index cc0382e5d4..3966af542e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -85,8 +85,8 @@ case class Expand(
}
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index efd8760cd2..80255fafbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -100,8 +100,8 @@ case class Sort(
override def usedInputs: AttributeSet = AttributeSet(Seq.empty)
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
// Name of sorter variable used in codegen.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 12d08c8c45..29b66e3dee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -74,10 +74,10 @@ trait CodegenSupport extends SparkPlan {
*
* Note: right now we support up to two RDDs.
*/
- def upstreams(): Seq[RDD[InternalRow]]
+ def inputRDDs(): Seq[RDD[InternalRow]]
/**
- * Returns Java source code to process the rows from upstream.
+ * Returns Java source code to process the rows from input RDD.
*/
final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
this.parent = parent
@@ -234,13 +234,13 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
child.doExecuteBroadcast()
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.execute() :: Nil
}
override def doProduce(ctx: CodegenContext): String = {
val input = ctx.freshName("input")
- // Right now, InputAdapter is only used when there is one upstream.
+ // Right now, InputAdapter is only used when there is one input RDD.
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val row = ctx.freshName("row")
s"""
@@ -272,7 +272,7 @@ object WholeStageCodegen {
*
* -> execute()
* |
- * doExecute() ---------> upstreams() -------> upstreams() ------> execute()
+ * doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute()
* |
* +-----------------> produce()
* |
@@ -350,8 +350,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
val durationMs = longMetric("pipelineTime")
- val rdds = child.asInstanceOf[CodegenSupport].upstreams()
- assert(rdds.size <= 2, "Up to two upstream RDDs can be supported")
+ val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
+ assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val clazz = CodeGenerator.compile(cleanedSource)
@@ -367,7 +367,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
}
}
} else {
- // Right now, we support up to two upstreams.
+ // Right now, we support up to two input RDDs.
rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) =>
val partitionIndex = TaskContext.getPartitionId()
val clazz = CodeGenerator.compile(cleanedSource)
@@ -385,7 +385,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
}
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
throw new UnsupportedOperationException
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index d819a65993..89977f9e08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -129,8 +129,8 @@ case class TungstenAggregate(
!aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate])
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index c689fc3fbb..892c57ae7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -31,8 +31,8 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
@@ -103,8 +103,8 @@ case class Filter(condition: Expression, child: SparkPlan)
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
@@ -243,8 +243,8 @@ case class Sample(
}
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
@@ -315,7 +315,7 @@ case class Range(
// output attributes should not affect the results
override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
.map(i => InternalRow(i)) :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 17eae88b49..e6079ecaad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -164,8 +164,8 @@ package object debug {
}
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index b94b0d26b2..89487c6b87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -71,8 +71,8 @@ case class BroadcastHashJoin(
}
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- streamedPlan.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ streamedPlan.asInstanceOf[CodegenSupport].inputRDDs()
}
override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 443a7b43b6..4e45fd6560 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -216,7 +216,7 @@ case class SortMergeJoin(
joinType == Inner
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
left.execute() :: right.execute() :: Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 9643b52f96..c9a14593fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -57,8 +57,8 @@ trait BaseLimit extends UnaryNode with CodegenSupport {
iter.take(limit)
}
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 784b1e8c26..e7261fc512 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -36,8 +36,8 @@ case class DeserializeToObject(
child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = deserializer.toAttribute :: Nil
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
@@ -69,8 +69,8 @@ case class SerializeFromObject(
child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
@@ -153,8 +153,8 @@ case class MapElements(
child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport {
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
- override def upstreams(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].upstreams()
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {