diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-04-18 14:03:40 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-18 14:03:40 -0700 |
commit | 8bd8121329cb1bb137e935dec124aa23f0fcf8c5 (patch) | |
tree | eb305c65cf776ad658515c56e4ed1cfddb5b6783 /sql/core/src | |
parent | 6fc1e72d9b70615bd91b598084406eb1893d6706 (diff) | |
download | spark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.tar.gz spark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.tar.bz2 spark-8bd8121329cb1bb137e935dec124aa23f0fcf8c5.zip |
[SPARK-14710][SQL] Rename gen/genCode to genCode/doGenCode to better reflect the semantics
## What changes were proposed in this pull request?
Per rxin's suggestions, this patch renames `s/gen/genCode` and `s/genCode/doGenCode` to better reflect the semantics of these 2 function calls.
## How was this patch tested?
N/A (refactoring only)
Author: Sameer Agarwal <sameer@databricks.com>
Closes #12475 from sameeragarwal/gencode.
Diffstat (limited to 'sql/core/src')
9 files changed, 35 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 392c48fb7b..3dc2aa33df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -192,7 +192,7 @@ private[sql] case class RowDataSourceScan( val row = ctx.freshName("row") ctx.INPUT_ROW = row ctx.currentVars = null - val columnsRowInput = exprRows.map(_.gen(ctx)) + val columnsRowInput = exprRows.map(_.genCode(ctx)) val inputRow = if (outputUnsafeRows) row else null s""" |while ($input.hasNext()) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index bd23b7e3ad..cc0382e5d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -149,7 +149,7 @@ case class Expand( val firstExpr = projections.head(col) if (sameOutput(col)) { // This column is the same across all output rows. Just generate code for it here. - BindReferences.bindReference(firstExpr, child.output).gen(ctx) + BindReferences.bindReference(firstExpr, child.output).genCode(ctx) } else { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") @@ -166,7 +166,7 @@ case class Expand( var updateCode = "" for (col <- exprs.indices) { if (!sameOutput(col)) { - val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx) + val ev = BindReferences.bindReference(exprs(col), child.output).genCode(ctx) updateCode += s""" |${ev.code} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 29acc38ab3..12d08c8c45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -118,7 +118,7 @@ trait CodegenSupport extends SparkPlan { ctx.currentVars = null ctx.INPUT_ROW = row output.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable).gen(ctx) + BoundReference(i, attr.dataType, attr.nullable).genCode(ctx) } } else { assert(outputVars != null) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index f585759e58..d819a65993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -165,7 +165,7 @@ case class TungstenAggregate( ctx.addMutableState("boolean", isNull, "") ctx.addMutableState(ctx.javaType(e.dataType), value, "") // The initial expression should not access any column - val ev = e.gen(ctx) + val ev = e.genCode(ctx) val initVars = s""" | $isNull = ${ev.isNull}; | $value = ${ev.value}; @@ -179,13 +179,13 @@ case class TungstenAggregate( // evaluate aggregate results ctx.currentVars = bufVars val aggResults = functions.map(_.evaluateExpression).map { e => - BindReferences.bindReference(e, aggregateBufferAttributes).gen(ctx) + BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx) } val evaluateAggResults = evaluateVariables(aggResults) // evaluate result expressions ctx.currentVars = aggResults val resultVars = resultExpressions.map { e => - BindReferences.bindReference(e, aggregateAttributes).gen(ctx) + BindReferences.bindReference(e, aggregateAttributes).genCode(ctx) } (resultVars, s""" |$evaluateAggResults @@ -196,7 +196,7 @@ case class TungstenAggregate( (bufVars, "") } else { // no aggregate function, the result should be literals - val resultVars = resultExpressions.map(_.gen(ctx)) + val resultVars = resultExpressions.map(_.genCode(ctx)) (resultVars, evaluateVariables(resultVars)) } @@ -240,7 +240,7 @@ case class TungstenAggregate( } ctx.currentVars = bufVars ++ input // TODO: support subexpression elimination - val aggVals = updateExpr.map(BindReferences.bindReference(_, inputAttrs).gen(ctx)) + val aggVals = updateExpr.map(BindReferences.bindReference(_, inputAttrs).genCode(ctx)) // aggregate buffer should be updated atomic val updates = aggVals.zipWithIndex.map { case (ev, i) => s""" @@ -394,25 +394,25 @@ case class TungstenAggregate( ctx.currentVars = null ctx.INPUT_ROW = keyTerm val keyVars = groupingExpressions.zipWithIndex.map { case (e, i) => - BoundReference(i, e.dataType, e.nullable).gen(ctx) + BoundReference(i, e.dataType, e.nullable).genCode(ctx) } val evaluateKeyVars = evaluateVariables(keyVars) ctx.INPUT_ROW = bufferTerm val bufferVars = aggregateBufferAttributes.zipWithIndex.map { case (e, i) => - BoundReference(i, e.dataType, e.nullable).gen(ctx) + BoundReference(i, e.dataType, e.nullable).genCode(ctx) } val evaluateBufferVars = evaluateVariables(bufferVars) // evaluate the aggregation result ctx.currentVars = bufferVars val aggResults = declFunctions.map(_.evaluateExpression).map { e => - BindReferences.bindReference(e, aggregateBufferAttributes).gen(ctx) + BindReferences.bindReference(e, aggregateBufferAttributes).genCode(ctx) } val evaluateAggResults = evaluateVariables(aggResults) // generate the final result ctx.currentVars = keyVars ++ aggResults val inputAttrs = groupingAttributes ++ aggregateAttributes val resultVars = resultExpressions.map { e => - BindReferences.bindReference(e, inputAttrs).gen(ctx) + BindReferences.bindReference(e, inputAttrs).genCode(ctx) } s""" $evaluateKeyVars @@ -437,7 +437,7 @@ case class TungstenAggregate( ctx.INPUT_ROW = keyTerm ctx.currentVars = null val eval = resultExpressions.map{ e => - BindReferences.bindReference(e, groupingAttributes).gen(ctx) + BindReferences.bindReference(e, groupingAttributes).genCode(ctx) } consume(ctx, eval) } @@ -576,7 +576,7 @@ case class TungstenAggregate( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) ctx.currentVars = input - val hashEval = BindReferences.bindReference(hashExpr, child.output).gen(ctx) + val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input @@ -613,7 +613,8 @@ case class TungstenAggregate( val updateRowInVectorizedHashMap: Option[String] = { if (isVectorizedHashMapEnabled) { ctx.INPUT_ROW = vectorizedRowBuffer - val vectorizedRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + val vectorizedRowEvals = + updateExpr.map(BindReferences.bindReference(_, inputAttr).genCode(ctx)) val updateVectorizedRow = vectorizedRowEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType ctx.updateColumn(vectorizedRowBuffer, dt, i, ev, updateExpr(i).nullable) @@ -663,7 +664,7 @@ case class TungstenAggregate( val updateRowInUnsafeRowMap: String = { ctx.INPUT_ROW = unsafeRowBuffer val unsafeRowBufferEvals = - updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) + updateExpr.map(BindReferences.bindReference(_, inputAttr).genCode(ctx)) val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 344aaff348..c689fc3fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -53,7 +53,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) val exprs = projectList.map(x => ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) ctx.currentVars = input - val resultVars = exprs.map(_.gen(ctx)) + val resultVars = exprs.map(_.genCode(ctx)) // Evaluation of non-deterministic expressions can't be deferred. val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute) s""" @@ -122,7 +122,7 @@ case class Filter(condition: Expression, child: SparkPlan) val evaluated = evaluateRequiredVariables(child.output, in, c.references) // Generate the code for the predicate. - val ev = ExpressionCanonicalizer.execute(bound).gen(ctx) + val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx) val nullCheck = if (bound.nullable) { s"${ev.isNull} || " } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index a8f854136c..b94b0d26b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -118,7 +118,7 @@ case class BroadcastHashJoin( ctx.currentVars = input if (streamedKeys.length == 1 && streamedKeys.head.dataType == LongType) { // generate the join key as Long - val ev = streamedKeys.head.gen(ctx) + val ev = streamedKeys.head.genCode(ctx) (ev, ev.isNull) } else { // generate the join key as UnsafeRow @@ -134,7 +134,7 @@ case class BroadcastHashJoin( ctx.currentVars = null ctx.INPUT_ROW = matched buildPlan.output.zipWithIndex.map { case (a, i) => - val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx) + val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx) if (joinType == Inner) { ev } else { @@ -170,7 +170,8 @@ case class BroadcastHashJoin( val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references) // filter the output via condition ctx.currentVars = input ++ buildVars - val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).gen(ctx) + val ev = + BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx) s""" |$eval |${ev.code} @@ -244,7 +245,8 @@ case class BroadcastHashJoin( // evaluate the variables from build side that used by condition val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references) ctx.currentVars = input ++ buildVars - val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).gen(ctx) + val ev = + BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx) s""" |boolean $conditionPassed = true; |${eval.trim} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 0e7b2f2f31..443a7b43b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -226,7 +226,7 @@ case class SortMergeJoin( keys: Seq[Expression], input: Seq[Attribute]): Seq[ExprCode] = { ctx.INPUT_ROW = row - keys.map(BindReferences.bindReference(_, input).gen(ctx)) + keys.map(BindReferences.bindReference(_, input).genCode(ctx)) } private def copyKeys(ctx: CodegenContext, vars: Seq[ExprCode]): Seq[ExprCode] = { @@ -376,7 +376,7 @@ case class SortMergeJoin( private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = { ctx.INPUT_ROW = rightRow right.output.zipWithIndex.map { case (a, i) => - BoundReference(i, a.dataType, a.nullable).gen(ctx) + BoundReference(i, a.dataType, a.nullable).genCode(ctx) } } @@ -427,7 +427,7 @@ case class SortMergeJoin( val (rightBefore, rightAfter) = splitVarsByCondition(right.output, rightVars) // Generate code for condition ctx.currentVars = leftVars ++ rightVars - val cond = BindReferences.bindReference(condition.get, output).gen(ctx) + val cond = BindReferences.bindReference(condition.get, output).genCode(ctx) // evaluate the columns those used by condition before loop val before = s""" |boolean $loaded = false; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index d2ab18ef0e..784b1e8c26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -48,7 +48,7 @@ case class DeserializeToObject( val bound = ExpressionCanonicalizer.execute( BindReferences.bindReference(deserializer, child.output)) ctx.currentVars = input - val resultVars = bound.gen(ctx) :: Nil + val resultVars = bound.genCode(ctx) :: Nil consume(ctx, resultVars) } @@ -82,7 +82,7 @@ case class SerializeFromObject( ExpressionCanonicalizer.execute(BindReferences.bindReference(expr, child.output)) } ctx.currentVars = input - val resultVars = bound.map(_.gen(ctx)) + val resultVars = bound.map(_.genCode(ctx)) consume(ctx, resultVars) } @@ -173,13 +173,13 @@ case class MapElements( val bound = ExpressionCanonicalizer.execute( BindReferences.bindReference(callFunc, child.output)) ctx.currentVars = input - val evaluated = bound.gen(ctx) + val evaluated = bound.genCode(ctx) val resultObj = LambdaVariable(evaluated.value, evaluated.isNull, resultObjType) val outputFields = serializer.map(_ transform { case _: BoundReference => resultObj }) - val resultVars = outputFields.map(_.gen(ctx)) + val resultVars = outputFields.map(_.genCode(ctx)) s""" ${evaluated.code} ${consume(ctx, resultVars)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 4b3091ba22..03defc121c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -54,8 +54,8 @@ case class ScalarSubquery( override def eval(input: InternalRow): Any = result - override def genCode(ctx: CodegenContext, ev: ExprCode): String = { - Literal.create(result, dataType).genCode(ctx, ev) + override def doGenCode(ctx: CodegenContext, ev: ExprCode): String = { + Literal.create(result, dataType).doGenCode(ctx, ev) } } |