diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-11-24 09:28:39 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-11-24 09:28:39 -0800 |
commit | e5aaae6e1145b8c25c4872b2992ab425da9c6f9b (patch) | |
tree | 8d54936bb41ffca0fb875dab8b62c432f62880bc /sql/catalyst | |
parent | be9dd1550c1816559d3d418a19c692e715f1c94e (diff) | |
download | spark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.tar.gz spark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.tar.bz2 spark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.zip |
[SPARK-11942][SQL] fix encoder life cycle for CoGroup
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #9928 from cloud-fan/cogroup.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 737e62fd59..5665fd7e5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -553,19 +553,22 @@ case class MapGroups[K, T, U]( /** Factory for constructing new `CoGroup` nodes. */ object CoGroup { - def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder]( - func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R], + def apply[Key, Left, Right, Result : Encoder]( + func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result], + keyEnc: ExpressionEncoder[Key], + leftEnc: ExpressionEncoder[Left], + rightEnc: ExpressionEncoder[Right], leftGroup: Seq[Attribute], rightGroup: Seq[Attribute], left: LogicalPlan, - right: LogicalPlan): CoGroup[K, Left, Right, R] = { + right: LogicalPlan): CoGroup[Key, Left, Right, Result] = { CoGroup( func, - encoderFor[K], - encoderFor[Left], - encoderFor[Right], - encoderFor[R], - encoderFor[R].schema.toAttributes, + keyEnc, + leftEnc, + rightEnc, + encoderFor[Result], + encoderFor[Result].schema.toAttributes, leftGroup, rightGroup, left, @@ -577,12 +580,12 @@ object CoGroup { * A relation produced by applying `func` to each grouping key and associated values from left and * right children. */ -case class CoGroup[K, Left, Right, R]( - func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R], - kEncoder: ExpressionEncoder[K], +case class CoGroup[Key, Left, Right, Result]( + func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result], + keyEnc: ExpressionEncoder[Key], leftEnc: ExpressionEncoder[Left], rightEnc: ExpressionEncoder[Right], - rEncoder: ExpressionEncoder[R], + resultEnc: ExpressionEncoder[Result], output: Seq[Attribute], leftGroup: Seq[Attribute], rightGroup: Seq[Attribute], |