diff options
Diffstat (limited to 'sql/core/src')
6 files changed, 123 insertions, 79 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1a09d70fb9..3c708cbf29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2251,16 +2251,16 @@ class Dataset[T] private[sql]( def unpersist(): this.type = unpersist(blocking = false) /** - * Represents the content of the [[Dataset]] as an [[RDD]] of [[Row]]s. Note that the RDD is - * memoized. Once called, it won't change even if you change any query planning related Spark SQL - * configurations (e.g. `spark.sql.shuffle.partitions`). + * Represents the content of the [[Dataset]] as an [[RDD]] of [[T]]. * * @group rdd * @since 1.6.0 */ lazy val rdd: RDD[T] = { - queryExecution.toRdd.mapPartitions { rows => - rows.map(boundTEncoder.fromRow) + val objectType = unresolvedTEncoder.deserializer.dataType + val deserialized = CatalystSerde.deserialize[T](logicalPlan) + sqlContext.executePlan(deserialized).toRdd.mapPartitions { rows => + rows.map(_.get(0, objectType).asInstanceOf[T]) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c15aaed365..a4b0fa59db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -346,21 +346,23 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") - case logical.DeserializeToObject(deserializer, child) => - execution.DeserializeToObject(deserializer, planLater(child)) :: Nil + case logical.DeserializeToObject(deserializer, objAttr, child) => + execution.DeserializeToObject(deserializer, objAttr, planLater(child)) :: Nil case logical.SerializeFromObject(serializer, child) => execution.SerializeFromObject(serializer, planLater(child)) :: Nil - case logical.MapPartitions(f, in, out, child) => - execution.MapPartitions(f, in, out, planLater(child)) :: Nil - case logical.MapElements(f, in, out, child) => - execution.MapElements(f, in, out, planLater(child)) :: Nil + case logical.MapPartitions(f, objAttr, child) => + execution.MapPartitions(f, objAttr, planLater(child)) :: Nil + case logical.MapElements(f, objAttr, child) => + execution.MapElements(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, in, out, child) => execution.AppendColumns(f, in, out, planLater(child)) :: Nil - case logical.MapGroups(f, key, in, out, grouping, data, child) => - execution.MapGroups(f, key, in, out, grouping, data, planLater(child)) :: Nil - case logical.CoGroup(f, keyObj, lObj, rObj, out, lGroup, rGroup, lAttr, rAttr, left, right) => + case logical.AppendColumnsWithObject(f, childSer, newSer, child) => + execution.AppendColumnsWithObject(f, childSer, newSer, planLater(child)) :: Nil + case logical.MapGroups(f, key, value, grouping, data, objAttr, child) => + execution.MapGroups(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil + case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) => execution.CoGroup( - f, keyObj, lObj, rObj, out, lGroup, rGroup, lAttr, rAttr, + f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, planLater(left), planLater(right)) :: Nil case logical.Repartition(numPartitions, shuffle, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 46eaede5e7..23b2eabd0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -473,6 +473,10 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { * Inserts a WholeStageCodegen on top of those that support codegen. */ private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match { + // For operators that will output domain object, do not insert WholeStageCodegen for it as + // domain object can not be written into unsafe row. + case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => + plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) case plan: CodegenSupport if supportCodegen(plan) => WholeStageCodegen(insertInputAdapter(plan)) case other => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index e7261fc512..7c8bc7fed8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -25,16 +25,19 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.types.ObjectType +import org.apache.spark.sql.types.{DataType, ObjectType} /** * Takes the input row from child and turns it into object using the given deserializer expression. * The output of this operator is a single-field safe row containing the deserialized object. */ case class DeserializeToObject( - deserializer: Alias, + deserializer: Expression, + outputObjAttr: Attribute, child: SparkPlan) extends UnaryNode with CodegenSupport { - override def output: Seq[Attribute] = deserializer.toAttribute :: Nil + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() @@ -67,6 +70,7 @@ case class DeserializeToObject( case class SerializeFromObject( serializer: Seq[NamedExpression], child: SparkPlan) extends UnaryNode with CodegenSupport { + override def output: Seq[Attribute] = serializer.map(_.toAttribute) override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -98,60 +102,71 @@ case class SerializeFromObject( * Helper functions for physical operators that work with user defined objects. */ trait ObjectOperator extends SparkPlan { - def generateToObject(objExpr: Expression, inputSchema: Seq[Attribute]): InternalRow => Any = { - val objectProjection = GenerateSafeProjection.generate(objExpr :: Nil, inputSchema) - (i: InternalRow) => objectProjection(i).get(0, objExpr.dataType) + def deserializeRowToObject( + deserializer: Expression, + inputSchema: Seq[Attribute]): InternalRow => Any = { + val proj = GenerateSafeProjection.generate(deserializer :: Nil, inputSchema) + (i: InternalRow) => proj(i).get(0, deserializer.dataType) } - def generateToRow(serializer: Seq[Expression]): Any => InternalRow = { - val outputProjection = if (serializer.head.dataType.isInstanceOf[ObjectType]) { - GenerateSafeProjection.generate(serializer) - } else { - GenerateUnsafeProjection.generate(serializer) + def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = { + val proj = GenerateUnsafeProjection.generate(serializer) + val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head + val objRow = new SpecificMutableRow(objType :: Nil) + (o: Any) => { + objRow(0) = o + proj(objRow) } - val inputType = serializer.head.collect { case b: BoundReference => b.dataType }.head - val outputRow = new SpecificMutableRow(inputType :: Nil) + } + + def wrapObjectToRow(objType: DataType): Any => InternalRow = { + val outputRow = new SpecificMutableRow(objType :: Nil) (o: Any) => { outputRow(0) = o - outputProjection(outputRow) + outputRow } } + + def unwrapObjectFromRow(objType: DataType): InternalRow => Any = { + (i: InternalRow) => i.get(0, objType) + } } /** - * Applies the given function to each input row and encodes the result. + * Applies the given function to input object iterator. + * The output of its child must be a single-field row containing the input object. */ case class MapPartitions( func: Iterator[Any] => Iterator[Any], - deserializer: Expression, - serializer: Seq[NamedExpression], + outputObjAttr: Attribute, child: SparkPlan) extends UnaryNode with ObjectOperator { - override def output: Seq[Attribute] = serializer.map(_.toAttribute) + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => - val getObject = generateToObject(deserializer, child.output) - val outputObject = generateToRow(serializer) + val getObject = unwrapObjectFromRow(child.output.head.dataType) + val outputObject = wrapObjectToRow(outputObjAttr.dataType) func(iter.map(getObject)).map(outputObject) } } } /** - * Applies the given function to each input row and encodes the result. + * Applies the given function to each input object. + * The output of its child must be a single-field row containing the input object. * - * Note that, each serializer expression needs the result object which is returned by the given - * function, as input. This operator uses some tricks to make sure we only calculate the result - * object once. We don't use [[Project]] directly as subexpression elimination doesn't work with - * whole stage codegen and it's confusing to show the un-common-subexpression-eliminated version of - * a project while explain. + * This operator is kind of a safe version of [[Project]], as it's output is custom object, we need + * to use safe row to contain it. */ case class MapElements( func: AnyRef, - deserializer: Expression, - serializer: Seq[NamedExpression], + outputObjAttr: Attribute, child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport { - override def output: Seq[Attribute] = serializer.map(_.toAttribute) + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() @@ -167,23 +182,14 @@ case class MapElements( case _ => classOf[Any => Any] -> "apply" } val funcObj = Literal.create(func, ObjectType(funcClass)) - val resultObjType = serializer.head.collect { case b: BoundReference => b }.head.dataType - val callFunc = Invoke(funcObj, methodName, resultObjType, Seq(deserializer)) + val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output) val bound = ExpressionCanonicalizer.execute( BindReferences.bindReference(callFunc, child.output)) ctx.currentVars = input - val evaluated = bound.genCode(ctx) - - val resultObj = LambdaVariable(evaluated.value, evaluated.isNull, resultObjType) - val outputFields = serializer.map(_ transform { - case _: BoundReference => resultObj - }) - val resultVars = outputFields.map(_.genCode(ctx)) - s""" - ${evaluated.code} - ${consume(ctx, resultVars)} - """ + val resultVars = bound.genCode(ctx) :: Nil + + consume(ctx, resultVars) } override protected def doExecute(): RDD[InternalRow] = { @@ -191,9 +197,10 @@ case class MapElements( case m: MapFunction[_, _] => i => m.asInstanceOf[MapFunction[Any, Any]].call(i) case _ => func.asInstanceOf[Any => Any] } + child.execute().mapPartitionsInternal { iter => - val getObject = generateToObject(deserializer, child.output) - val outputObject = generateToRow(serializer) + val getObject = unwrapObjectFromRow(child.output.head.dataType) + val outputObject = wrapObjectToRow(outputObjAttr.dataType) iter.map(row => outputObject(callFunc(getObject(row)))) } } @@ -216,15 +223,43 @@ case class AppendColumns( override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => - val getObject = generateToObject(deserializer, child.output) + val getObject = deserializeRowToObject(deserializer, child.output) val combiner = GenerateUnsafeRowJoiner.create(child.schema, newColumnSchema) - val outputObject = generateToRow(serializer) + val outputObject = serializeObjectToRow(serializer) iter.map { row => val newColumns = outputObject(func(getObject(row))) + combiner.join(row.asInstanceOf[UnsafeRow], newColumns): InternalRow + } + } + } +} + +/** + * An optimized version of [[AppendColumns]], that can be executed on deserialized object directly. + */ +case class AppendColumnsWithObject( + func: Any => Any, + inputSerializer: Seq[NamedExpression], + newColumnsSerializer: Seq[NamedExpression], + child: SparkPlan) extends UnaryNode with ObjectOperator { + + override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute) - // This operates on the assumption that we always serialize the result... - combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]): InternalRow + private def inputSchema = inputSerializer.map(_.toAttribute).toStructType + private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType + + override protected def doExecute(): RDD[InternalRow] = { + child.execute().mapPartitionsInternal { iter => + val getChildObject = unwrapObjectFromRow(child.output.head.dataType) + val outputChildObject = serializeObjectToRow(inputSerializer) + val outputNewColumnOjb = serializeObjectToRow(newColumnsSerializer) + val combiner = GenerateUnsafeRowJoiner.create(inputSchema, newColumnSchema) + + iter.map { row => + val childObj = getChildObject(row) + val newColumns = outputNewColumnOjb(func(childObj)) + combiner.join(outputChildObject(childObj), newColumns): InternalRow } } } @@ -232,19 +267,19 @@ case class AppendColumns( /** * Groups the input rows together and calls the function with each group and an iterator containing - * all elements in the group. The result of this function is encoded and flattened before - * being output. + * all elements in the group. The result of this function is flattened before being output. */ case class MapGroups( func: (Any, Iterator[Any]) => TraversableOnce[Any], keyDeserializer: Expression, valueDeserializer: Expression, - serializer: Seq[NamedExpression], groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], + outputObjAttr: Attribute, child: SparkPlan) extends UnaryNode with ObjectOperator { - override def output: Seq[Attribute] = serializer.map(_.toAttribute) + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -256,9 +291,9 @@ case class MapGroups( child.execute().mapPartitionsInternal { iter => val grouped = GroupedIterator(iter, groupingAttributes, child.output) - val getKey = generateToObject(keyDeserializer, groupingAttributes) - val getValue = generateToObject(valueDeserializer, dataAttributes) - val outputObject = generateToRow(serializer) + val getKey = deserializeRowToObject(keyDeserializer, groupingAttributes) + val getValue = deserializeRowToObject(valueDeserializer, dataAttributes) + val outputObject = wrapObjectToRow(outputObjAttr.dataType) grouped.flatMap { case (key, rowIter) => val result = func( @@ -273,22 +308,23 @@ case class MapGroups( /** * Co-groups the data from left and right children, and calls the function with each group and 2 * iterators containing all elements in the group from left and right side. - * The result of this function is encoded and flattened before being output. + * The result of this function is flattened before being output. */ case class CoGroup( func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any], keyDeserializer: Expression, leftDeserializer: Expression, rightDeserializer: Expression, - serializer: Seq[NamedExpression], leftGroup: Seq[Attribute], rightGroup: Seq[Attribute], leftAttr: Seq[Attribute], rightAttr: Seq[Attribute], + outputObjAttr: Attribute, left: SparkPlan, right: SparkPlan) extends BinaryNode with ObjectOperator { - override def output: Seq[Attribute] = serializer.map(_.toAttribute) + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil @@ -301,10 +337,10 @@ case class CoGroup( val leftGrouped = GroupedIterator(leftData, leftGroup, left.output) val rightGrouped = GroupedIterator(rightData, rightGroup, right.output) - val getKey = generateToObject(keyDeserializer, leftGroup) - val getLeft = generateToObject(leftDeserializer, leftAttr) - val getRight = generateToObject(rightDeserializer, rightAttr) - val outputObject = generateToRow(serializer) + val getKey = deserializeRowToObject(keyDeserializer, leftGroup) + val getLeft = deserializeRowToObject(leftDeserializer, leftAttr) + val getRight = deserializeRowToObject(rightDeserializer, rightAttr) + val outputObject = wrapObjectToRow(outputObjAttr.dataType) new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup).flatMap { case (key, leftResult, rightResult) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 23a0ce215f..2dca792c83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -201,7 +201,9 @@ abstract class QueryTest extends PlanTest { val logicalPlan = df.queryExecution.analyzed // bypass some cases that we can't handle currently. logicalPlan.transform { - case _: ObjectOperator => return + case _: ObjectConsumer => return + case _: ObjectProducer => return + case _: AppendColumns => return case _: LogicalRelation => return case _: MemoryPlan => return }.transformAllExpressions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 8efd9de29e..d7cf1dc6aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -79,7 +79,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val plan = ds.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[MapElements]).isDefined) + p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined) assert(ds.collect() === 0.until(10).map(_.toString).toArray) } |