aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala160
4 files changed, 119 insertions, 77 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 1a09d70fb9..3c708cbf29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2251,16 +2251,16 @@ class Dataset[T] private[sql](
def unpersist(): this.type = unpersist(blocking = false)
/**
- * Represents the content of the [[Dataset]] as an [[RDD]] of [[Row]]s. Note that the RDD is
- * memoized. Once called, it won't change even if you change any query planning related Spark SQL
- * configurations (e.g. `spark.sql.shuffle.partitions`).
+ * Represents the content of the [[Dataset]] as an [[RDD]] of [[T]].
*
* @group rdd
* @since 1.6.0
*/
lazy val rdd: RDD[T] = {
- queryExecution.toRdd.mapPartitions { rows =>
- rows.map(boundTEncoder.fromRow)
+ val objectType = unresolvedTEncoder.deserializer.dataType
+ val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+ sqlContext.executePlan(deserialized).toRdd.mapPartitions { rows =>
+ rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c15aaed365..a4b0fa59db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -346,21 +346,23 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw new IllegalStateException(
"logical intersect operator should have been replaced by semi-join in the optimizer")
- case logical.DeserializeToObject(deserializer, child) =>
- execution.DeserializeToObject(deserializer, planLater(child)) :: Nil
+ case logical.DeserializeToObject(deserializer, objAttr, child) =>
+ execution.DeserializeToObject(deserializer, objAttr, planLater(child)) :: Nil
case logical.SerializeFromObject(serializer, child) =>
execution.SerializeFromObject(serializer, planLater(child)) :: Nil
- case logical.MapPartitions(f, in, out, child) =>
- execution.MapPartitions(f, in, out, planLater(child)) :: Nil
- case logical.MapElements(f, in, out, child) =>
- execution.MapElements(f, in, out, planLater(child)) :: Nil
+ case logical.MapPartitions(f, objAttr, child) =>
+ execution.MapPartitions(f, objAttr, planLater(child)) :: Nil
+ case logical.MapElements(f, objAttr, child) =>
+ execution.MapElements(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, in, out, child) =>
execution.AppendColumns(f, in, out, planLater(child)) :: Nil
- case logical.MapGroups(f, key, in, out, grouping, data, child) =>
- execution.MapGroups(f, key, in, out, grouping, data, planLater(child)) :: Nil
- case logical.CoGroup(f, keyObj, lObj, rObj, out, lGroup, rGroup, lAttr, rAttr, left, right) =>
+ case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
+ execution.AppendColumnsWithObject(f, childSer, newSer, planLater(child)) :: Nil
+ case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
+ execution.MapGroups(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil
+ case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) =>
execution.CoGroup(
- f, keyObj, lObj, rObj, out, lGroup, rGroup, lAttr, rAttr,
+ f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
planLater(left), planLater(right)) :: Nil
case logical.Repartition(numPartitions, shuffle, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 46eaede5e7..23b2eabd0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -473,6 +473,10 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
* Inserts a WholeStageCodegen on top of those that support codegen.
*/
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
+ // For operators that will output domain object, do not insert WholeStageCodegen for it as
+ // domain object can not be written into unsafe row.
+ case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
+ plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegen(insertInputAdapter(plan))
case other =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index e7261fc512..7c8bc7fed8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -25,16 +25,19 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.types.ObjectType
+import org.apache.spark.sql.types.{DataType, ObjectType}
/**
* Takes the input row from child and turns it into object using the given deserializer expression.
* The output of this operator is a single-field safe row containing the deserialized object.
*/
case class DeserializeToObject(
- deserializer: Alias,
+ deserializer: Expression,
+ outputObjAttr: Attribute,
child: SparkPlan) extends UnaryNode with CodegenSupport {
- override def output: Seq[Attribute] = deserializer.toAttribute :: Nil
+
+ override def output: Seq[Attribute] = outputObjAttr :: Nil
+ override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
@@ -67,6 +70,7 @@ case class DeserializeToObject(
case class SerializeFromObject(
serializer: Seq[NamedExpression],
child: SparkPlan) extends UnaryNode with CodegenSupport {
+
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -98,60 +102,71 @@ case class SerializeFromObject(
* Helper functions for physical operators that work with user defined objects.
*/
trait ObjectOperator extends SparkPlan {
- def generateToObject(objExpr: Expression, inputSchema: Seq[Attribute]): InternalRow => Any = {
- val objectProjection = GenerateSafeProjection.generate(objExpr :: Nil, inputSchema)
- (i: InternalRow) => objectProjection(i).get(0, objExpr.dataType)
+ def deserializeRowToObject(
+ deserializer: Expression,
+ inputSchema: Seq[Attribute]): InternalRow => Any = {
+ val proj = GenerateSafeProjection.generate(deserializer :: Nil, inputSchema)
+ (i: InternalRow) => proj(i).get(0, deserializer.dataType)
}
- def generateToRow(serializer: Seq[Expression]): Any => InternalRow = {
- val outputProjection = if (serializer.head.dataType.isInstanceOf[ObjectType]) {
- GenerateSafeProjection.generate(serializer)
- } else {
- GenerateUnsafeProjection.generate(serializer)
+ def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = {
+ val proj = GenerateUnsafeProjection.generate(serializer)
+ val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head
+ val objRow = new SpecificMutableRow(objType :: Nil)
+ (o: Any) => {
+ objRow(0) = o
+ proj(objRow)
}
- val inputType = serializer.head.collect { case b: BoundReference => b.dataType }.head
- val outputRow = new SpecificMutableRow(inputType :: Nil)
+ }
+
+ def wrapObjectToRow(objType: DataType): Any => InternalRow = {
+ val outputRow = new SpecificMutableRow(objType :: Nil)
(o: Any) => {
outputRow(0) = o
- outputProjection(outputRow)
+ outputRow
}
}
+
+ def unwrapObjectFromRow(objType: DataType): InternalRow => Any = {
+ (i: InternalRow) => i.get(0, objType)
+ }
}
/**
- * Applies the given function to each input row and encodes the result.
+ * Applies the given function to input object iterator.
+ * The output of its child must be a single-field row containing the input object.
*/
case class MapPartitions(
func: Iterator[Any] => Iterator[Any],
- deserializer: Expression,
- serializer: Seq[NamedExpression],
+ outputObjAttr: Attribute,
child: SparkPlan) extends UnaryNode with ObjectOperator {
- override def output: Seq[Attribute] = serializer.map(_.toAttribute)
+
+ override def output: Seq[Attribute] = outputObjAttr :: Nil
+ override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
- val getObject = generateToObject(deserializer, child.output)
- val outputObject = generateToRow(serializer)
+ val getObject = unwrapObjectFromRow(child.output.head.dataType)
+ val outputObject = wrapObjectToRow(outputObjAttr.dataType)
func(iter.map(getObject)).map(outputObject)
}
}
}
/**
- * Applies the given function to each input row and encodes the result.
+ * Applies the given function to each input object.
+ * The output of its child must be a single-field row containing the input object.
*
- * Note that, each serializer expression needs the result object which is returned by the given
- * function, as input. This operator uses some tricks to make sure we only calculate the result
- * object once. We don't use [[Project]] directly as subexpression elimination doesn't work with
- * whole stage codegen and it's confusing to show the un-common-subexpression-eliminated version of
- * a project while explain.
+ * This operator is kind of a safe version of [[Project]], as it's output is custom object, we need
+ * to use safe row to contain it.
*/
case class MapElements(
func: AnyRef,
- deserializer: Expression,
- serializer: Seq[NamedExpression],
+ outputObjAttr: Attribute,
child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport {
- override def output: Seq[Attribute] = serializer.map(_.toAttribute)
+
+ override def output: Seq[Attribute] = outputObjAttr :: Nil
+ override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
@@ -167,23 +182,14 @@ case class MapElements(
case _ => classOf[Any => Any] -> "apply"
}
val funcObj = Literal.create(func, ObjectType(funcClass))
- val resultObjType = serializer.head.collect { case b: BoundReference => b }.head.dataType
- val callFunc = Invoke(funcObj, methodName, resultObjType, Seq(deserializer))
+ val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output)
val bound = ExpressionCanonicalizer.execute(
BindReferences.bindReference(callFunc, child.output))
ctx.currentVars = input
- val evaluated = bound.genCode(ctx)
-
- val resultObj = LambdaVariable(evaluated.value, evaluated.isNull, resultObjType)
- val outputFields = serializer.map(_ transform {
- case _: BoundReference => resultObj
- })
- val resultVars = outputFields.map(_.genCode(ctx))
- s"""
- ${evaluated.code}
- ${consume(ctx, resultVars)}
- """
+ val resultVars = bound.genCode(ctx) :: Nil
+
+ consume(ctx, resultVars)
}
override protected def doExecute(): RDD[InternalRow] = {
@@ -191,9 +197,10 @@ case class MapElements(
case m: MapFunction[_, _] => i => m.asInstanceOf[MapFunction[Any, Any]].call(i)
case _ => func.asInstanceOf[Any => Any]
}
+
child.execute().mapPartitionsInternal { iter =>
- val getObject = generateToObject(deserializer, child.output)
- val outputObject = generateToRow(serializer)
+ val getObject = unwrapObjectFromRow(child.output.head.dataType)
+ val outputObject = wrapObjectToRow(outputObjAttr.dataType)
iter.map(row => outputObject(callFunc(getObject(row))))
}
}
@@ -216,15 +223,43 @@ case class AppendColumns(
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
- val getObject = generateToObject(deserializer, child.output)
+ val getObject = deserializeRowToObject(deserializer, child.output)
val combiner = GenerateUnsafeRowJoiner.create(child.schema, newColumnSchema)
- val outputObject = generateToRow(serializer)
+ val outputObject = serializeObjectToRow(serializer)
iter.map { row =>
val newColumns = outputObject(func(getObject(row)))
+ combiner.join(row.asInstanceOf[UnsafeRow], newColumns): InternalRow
+ }
+ }
+ }
+}
+
+/**
+ * An optimized version of [[AppendColumns]], that can be executed on deserialized object directly.
+ */
+case class AppendColumnsWithObject(
+ func: Any => Any,
+ inputSerializer: Seq[NamedExpression],
+ newColumnsSerializer: Seq[NamedExpression],
+ child: SparkPlan) extends UnaryNode with ObjectOperator {
+
+ override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute)
- // This operates on the assumption that we always serialize the result...
- combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]): InternalRow
+ private def inputSchema = inputSerializer.map(_.toAttribute).toStructType
+ private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitionsInternal { iter =>
+ val getChildObject = unwrapObjectFromRow(child.output.head.dataType)
+ val outputChildObject = serializeObjectToRow(inputSerializer)
+ val outputNewColumnOjb = serializeObjectToRow(newColumnsSerializer)
+ val combiner = GenerateUnsafeRowJoiner.create(inputSchema, newColumnSchema)
+
+ iter.map { row =>
+ val childObj = getChildObject(row)
+ val newColumns = outputNewColumnOjb(func(childObj))
+ combiner.join(outputChildObject(childObj), newColumns): InternalRow
}
}
}
@@ -232,19 +267,19 @@ case class AppendColumns(
/**
* Groups the input rows together and calls the function with each group and an iterator containing
- * all elements in the group. The result of this function is encoded and flattened before
- * being output.
+ * all elements in the group. The result of this function is flattened before being output.
*/
case class MapGroups(
func: (Any, Iterator[Any]) => TraversableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
- serializer: Seq[NamedExpression],
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
+ outputObjAttr: Attribute,
child: SparkPlan) extends UnaryNode with ObjectOperator {
- override def output: Seq[Attribute] = serializer.map(_.toAttribute)
+ override def output: Seq[Attribute] = outputObjAttr :: Nil
+ override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
@@ -256,9 +291,9 @@ case class MapGroups(
child.execute().mapPartitionsInternal { iter =>
val grouped = GroupedIterator(iter, groupingAttributes, child.output)
- val getKey = generateToObject(keyDeserializer, groupingAttributes)
- val getValue = generateToObject(valueDeserializer, dataAttributes)
- val outputObject = generateToRow(serializer)
+ val getKey = deserializeRowToObject(keyDeserializer, groupingAttributes)
+ val getValue = deserializeRowToObject(valueDeserializer, dataAttributes)
+ val outputObject = wrapObjectToRow(outputObjAttr.dataType)
grouped.flatMap { case (key, rowIter) =>
val result = func(
@@ -273,22 +308,23 @@ case class MapGroups(
/**
* Co-groups the data from left and right children, and calls the function with each group and 2
* iterators containing all elements in the group from left and right side.
- * The result of this function is encoded and flattened before being output.
+ * The result of this function is flattened before being output.
*/
case class CoGroup(
func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
keyDeserializer: Expression,
leftDeserializer: Expression,
rightDeserializer: Expression,
- serializer: Seq[NamedExpression],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
leftAttr: Seq[Attribute],
rightAttr: Seq[Attribute],
+ outputObjAttr: Attribute,
left: SparkPlan,
right: SparkPlan) extends BinaryNode with ObjectOperator {
- override def output: Seq[Attribute] = serializer.map(_.toAttribute)
+ override def output: Seq[Attribute] = outputObjAttr :: Nil
+ override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
@@ -301,10 +337,10 @@ case class CoGroup(
val leftGrouped = GroupedIterator(leftData, leftGroup, left.output)
val rightGrouped = GroupedIterator(rightData, rightGroup, right.output)
- val getKey = generateToObject(keyDeserializer, leftGroup)
- val getLeft = generateToObject(leftDeserializer, leftAttr)
- val getRight = generateToObject(rightDeserializer, rightAttr)
- val outputObject = generateToRow(serializer)
+ val getKey = deserializeRowToObject(keyDeserializer, leftGroup)
+ val getLeft = deserializeRowToObject(leftDeserializer, leftAttr)
+ val getRight = deserializeRowToObject(rightDeserializer, rightAttr)
+ val outputObject = wrapObjectToRow(outputObjAttr.dataType)
new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup).flatMap {
case (key, leftResult, rightResult) =>