aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-18 14:03:40 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 14:03:40 -0700
commit8bd8121329cb1bb137e935dec124aa23f0fcf8c5 (patch)
treeeb305c65cf776ad658515c56e4ed1cfddb5b6783 /sql/core/src
parent6fc1e72d9b70615bd91b598084406eb1893d6706 (diff)
downloadspark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.tar.gz
spark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.tar.bz2
spark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.zip
[SPARK-14710][SQL] Rename gen/genCode to genCode/doGenCode to better reflect the semantics
## What changes were proposed in this pull request? Per rxin's suggestions, this patch renames `s/gen/genCode` and `s/genCode/doGenCode` to better reflect the semantics of these 2 function calls. ## How was this patch tested? N/A (refactoring only) Author: Sameer Agarwal <sameer@databricks.com> Closes #12475 from sameeragarwal/gencode.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala4
9 files changed, 35 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 392c48fb7b..3dc2aa33df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -192,7 +192,7 @@ private[sql] case class RowDataSourceScan(
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
ctx.currentVars = null
- val columnsRowInput = exprRows.map(_.gen(ctx))
+ val columnsRowInput = exprRows.map(_.genCode(ctx))
val inputRow = if (outputUnsafeRows) row else null
s"""
|while ($input.hasNext()) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index bd23b7e3ad..cc0382e5d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -149,7 +149,7 @@ case class Expand(
val firstExpr = projections.head(col)
if (sameOutput(col)) {
// This column is the same across all output rows. Just generate code for it here.
- BindReferences.bindReference(firstExpr, child.output).gen(ctx)
+ BindReferences.bindReference(firstExpr, child.output).genCode(ctx)
} else {
val isNull = ctx.freshName("isNull")
val value = ctx.freshName("value")
@@ -166,7 +166,7 @@ case class Expand(
var updateCode = ""
for (col <- exprs.indices) {
if (!sameOutput(col)) {
- val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx)
+ val ev = BindReferences.bindReference(exprs(col), child.output).genCode(ctx)
updateCode +=
s"""
|${ev.code}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 29acc38ab3..12d08c8c45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -118,7 +118,7 @@ trait CodegenSupport extends SparkPlan {
ctx.currentVars = null
ctx.INPUT_ROW = row
output.zipWithIndex.map { case (attr, i) =>
- BoundReference(i, attr.dataType, attr.nullable).gen(ctx)
+ BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
}
} else {
assert(outputVars != null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index f585759e58..d819a65993 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -165,7 +165,7 @@ case class TungstenAggregate(
ctx.addMutableState("boolean", isNull, "")
ctx.addMutableState(ctx.javaType(e.dataType), value, "")
// The initial expression should not access any column
- val ev = e.gen(ctx)
+ val ev = e.genCode(ctx)
val initVars = s"""
| $isNull = ${ev.isNull};
| $value = ${ev.value};
@@ -179,13 +179,13 @@ case class TungstenAggregate(
// evaluate aggregate results
ctx.currentVars = bufVars
val aggResults = functions.map(_.evaluateExpression).map { e =>
- BindReferences.bindReference(e, aggregateBufferAttributes).gen(ctx)
+ BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx)
}
val evaluateAggResults = evaluateVariables(aggResults)
// evaluate result expressions
ctx.currentVars = aggResults
val resultVars = resultExpressions.map { e =>
- BindReferences.bindReference(e, aggregateAttributes).gen(ctx)
+ BindReferences.bindReference(e, aggregateAttributes).genCode(ctx)
}
(resultVars, s"""
|$evaluateAggResults
@@ -196,7 +196,7 @@ case class TungstenAggregate(
(bufVars, "")
} else {
// no aggregate function, the result should be literals
- val resultVars = resultExpressions.map(_.gen(ctx))
+ val resultVars = resultExpressions.map(_.genCode(ctx))
(resultVars, evaluateVariables(resultVars))
}
@@ -240,7 +240,7 @@ case class TungstenAggregate(
}
ctx.currentVars = bufVars ++ input
// TODO: support subexpression elimination
- val aggVals = updateExpr.map(BindReferences.bindReference(_, inputAttrs).gen(ctx))
+ val aggVals = updateExpr.map(BindReferences.bindReference(_, inputAttrs).genCode(ctx))
// aggregate buffer should be updated atomic
val updates = aggVals.zipWithIndex.map { case (ev, i) =>
s"""
@@ -394,25 +394,25 @@ case class TungstenAggregate(
ctx.currentVars = null
ctx.INPUT_ROW = keyTerm
val keyVars = groupingExpressions.zipWithIndex.map { case (e, i) =>
- BoundReference(i, e.dataType, e.nullable).gen(ctx)
+ BoundReference(i, e.dataType, e.nullable).genCode(ctx)
}
val evaluateKeyVars = evaluateVariables(keyVars)
ctx.INPUT_ROW = bufferTerm
val bufferVars = aggregateBufferAttributes.zipWithIndex.map { case (e, i) =>
- BoundReference(i, e.dataType, e.nullable).gen(ctx)
+ BoundReference(i, e.dataType, e.nullable).genCode(ctx)
}
val evaluateBufferVars = evaluateVariables(bufferVars)
// evaluate the aggregation result
ctx.currentVars = bufferVars
val aggResults = declFunctions.map(_.evaluateExpression).map { e =>
- BindReferences.bindReference(e, aggregateBufferAttributes).gen(ctx)
+ BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx)
}
val evaluateAggResults = evaluateVariables(aggResults)
// generate the final result
ctx.currentVars = keyVars ++ aggResults
val inputAttrs = groupingAttributes ++ aggregateAttributes
val resultVars = resultExpressions.map { e =>
- BindReferences.bindReference(e, inputAttrs).gen(ctx)
+ BindReferences.bindReference(e, inputAttrs).genCode(ctx)
}
s"""
$evaluateKeyVars
@@ -437,7 +437,7 @@ case class TungstenAggregate(
ctx.INPUT_ROW = keyTerm
ctx.currentVars = null
val eval = resultExpressions.map{ e =>
- BindReferences.bindReference(e, groupingAttributes).gen(ctx)
+ BindReferences.bindReference(e, groupingAttributes).genCode(ctx)
}
consume(ctx, eval)
}
@@ -576,7 +576,7 @@ case class TungstenAggregate(
// generate hash code for key
val hashExpr = Murmur3Hash(groupingExpressions, 42)
ctx.currentVars = input
- val hashEval = BindReferences.bindReference(hashExpr, child.output).gen(ctx)
+ val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
val inputAttr = aggregateBufferAttributes ++ child.output
ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input
@@ -613,7 +613,8 @@ case class TungstenAggregate(
val updateRowInVectorizedHashMap: Option[String] = {
if (isVectorizedHashMapEnabled) {
ctx.INPUT_ROW = vectorizedRowBuffer
- val vectorizedRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx))
+ val vectorizedRowEvals =
+ updateExpr.map(BindReferences.bindReference(_, inputAttr).genCode(ctx))
val updateVectorizedRow = vectorizedRowEvals.zipWithIndex.map { case (ev, i) =>
val dt = updateExpr(i).dataType
ctx.updateColumn(vectorizedRowBuffer, dt, i, ev, updateExpr(i).nullable)
@@ -663,7 +664,7 @@ case class TungstenAggregate(
val updateRowInUnsafeRowMap: String = {
ctx.INPUT_ROW = unsafeRowBuffer
val unsafeRowBufferEvals =
- updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx))
+ updateExpr.map(BindReferences.bindReference(_, inputAttr).genCode(ctx))
val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) =>
val dt = updateExpr(i).dataType
ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 344aaff348..c689fc3fbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -53,7 +53,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
val exprs = projectList.map(x =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
ctx.currentVars = input
- val resultVars = exprs.map(_.gen(ctx))
+ val resultVars = exprs.map(_.genCode(ctx))
// Evaluation of non-deterministic expressions can't be deferred.
val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute)
s"""
@@ -122,7 +122,7 @@ case class Filter(condition: Expression, child: SparkPlan)
val evaluated = evaluateRequiredVariables(child.output, in, c.references)
// Generate the code for the predicate.
- val ev = ExpressionCanonicalizer.execute(bound).gen(ctx)
+ val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
val nullCheck = if (bound.nullable) {
s"${ev.isNull} || "
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index a8f854136c..b94b0d26b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -118,7 +118,7 @@ case class BroadcastHashJoin(
ctx.currentVars = input
if (streamedKeys.length == 1 && streamedKeys.head.dataType == LongType) {
// generate the join key as Long
- val ev = streamedKeys.head.gen(ctx)
+ val ev = streamedKeys.head.genCode(ctx)
(ev, ev.isNull)
} else {
// generate the join key as UnsafeRow
@@ -134,7 +134,7 @@ case class BroadcastHashJoin(
ctx.currentVars = null
ctx.INPUT_ROW = matched
buildPlan.output.zipWithIndex.map { case (a, i) =>
- val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
+ val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
if (joinType == Inner) {
ev
} else {
@@ -170,7 +170,8 @@ case class BroadcastHashJoin(
val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references)
// filter the output via condition
ctx.currentVars = input ++ buildVars
- val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).gen(ctx)
+ val ev =
+ BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx)
s"""
|$eval
|${ev.code}
@@ -244,7 +245,8 @@ case class BroadcastHashJoin(
// evaluate the variables from build side that used by condition
val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references)
ctx.currentVars = input ++ buildVars
- val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).gen(ctx)
+ val ev =
+ BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx)
s"""
|boolean $conditionPassed = true;
|${eval.trim}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 0e7b2f2f31..443a7b43b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -226,7 +226,7 @@ case class SortMergeJoin(
keys: Seq[Expression],
input: Seq[Attribute]): Seq[ExprCode] = {
ctx.INPUT_ROW = row
- keys.map(BindReferences.bindReference(_, input).gen(ctx))
+ keys.map(BindReferences.bindReference(_, input).genCode(ctx))
}
private def copyKeys(ctx: CodegenContext, vars: Seq[ExprCode]): Seq[ExprCode] = {
@@ -376,7 +376,7 @@ case class SortMergeJoin(
private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = {
ctx.INPUT_ROW = rightRow
right.output.zipWithIndex.map { case (a, i) =>
- BoundReference(i, a.dataType, a.nullable).gen(ctx)
+ BoundReference(i, a.dataType, a.nullable).genCode(ctx)
}
}
@@ -427,7 +427,7 @@ case class SortMergeJoin(
val (rightBefore, rightAfter) = splitVarsByCondition(right.output, rightVars)
// Generate code for condition
ctx.currentVars = leftVars ++ rightVars
- val cond = BindReferences.bindReference(condition.get, output).gen(ctx)
+ val cond = BindReferences.bindReference(condition.get, output).genCode(ctx)
// evaluate the columns those used by condition before loop
val before = s"""
|boolean $loaded = false;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index d2ab18ef0e..784b1e8c26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -48,7 +48,7 @@ case class DeserializeToObject(
val bound = ExpressionCanonicalizer.execute(
BindReferences.bindReference(deserializer, child.output))
ctx.currentVars = input
- val resultVars = bound.gen(ctx) :: Nil
+ val resultVars = bound.genCode(ctx) :: Nil
consume(ctx, resultVars)
}
@@ -82,7 +82,7 @@ case class SerializeFromObject(
ExpressionCanonicalizer.execute(BindReferences.bindReference(expr, child.output))
}
ctx.currentVars = input
- val resultVars = bound.map(_.gen(ctx))
+ val resultVars = bound.map(_.genCode(ctx))
consume(ctx, resultVars)
}
@@ -173,13 +173,13 @@ case class MapElements(
val bound = ExpressionCanonicalizer.execute(
BindReferences.bindReference(callFunc, child.output))
ctx.currentVars = input
- val evaluated = bound.gen(ctx)
+ val evaluated = bound.genCode(ctx)
val resultObj = LambdaVariable(evaluated.value, evaluated.isNull, resultObjType)
val outputFields = serializer.map(_ transform {
case _: BoundReference => resultObj
})
- val resultVars = outputFields.map(_.gen(ctx))
+ val resultVars = outputFields.map(_.genCode(ctx))
s"""
${evaluated.code}
${consume(ctx, resultVars)}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 4b3091ba22..03defc121c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -54,8 +54,8 @@ case class ScalarSubquery(
override def eval(input: InternalRow): Any = result
- override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
- Literal.create(result, dataType).genCode(ctx, ev)
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): String = {
+ Literal.create(result, dataType).doGenCode(ctx, ev)
}
}