aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-11-24 09:28:39 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-24 09:28:39 -0800
commite5aaae6e1145b8c25c4872b2992ab425da9c6f9b (patch)
tree8d54936bb41ffca0fb875dab8b62c432f62880bc /sql/catalyst
parentbe9dd1550c1816559d3d418a19c692e715f1c94e (diff)
downloadspark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.tar.gz
spark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.tar.bz2
spark-e5aaae6e1145b8c25c4872b2992ab425da9c6f9b.zip
[SPARK-11942][SQL] fix encoder life cycle for CoGroup
we should pass in resolved encodera to logical `CoGroup` and bind them in physical `CoGroup` Author: Wenchen Fan <wenchen@databricks.com> Closes #9928 from cloud-fan/cogroup.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala27
1 files changed, 15 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 737e62fd59..5665fd7e5f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -553,19 +553,22 @@ case class MapGroups[K, T, U](
/** Factory for constructing new `CoGroup` nodes. */
object CoGroup {
- def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder](
- func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
+ def apply[Key, Left, Right, Result : Encoder](
+ func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+ keyEnc: ExpressionEncoder[Key],
+ leftEnc: ExpressionEncoder[Left],
+ rightEnc: ExpressionEncoder[Right],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
left: LogicalPlan,
- right: LogicalPlan): CoGroup[K, Left, Right, R] = {
+ right: LogicalPlan): CoGroup[Key, Left, Right, Result] = {
CoGroup(
func,
- encoderFor[K],
- encoderFor[Left],
- encoderFor[Right],
- encoderFor[R],
- encoderFor[R].schema.toAttributes,
+ keyEnc,
+ leftEnc,
+ rightEnc,
+ encoderFor[Result],
+ encoderFor[Result].schema.toAttributes,
leftGroup,
rightGroup,
left,
@@ -577,12 +580,12 @@ object CoGroup {
* A relation produced by applying `func` to each grouping key and associated values from left and
* right children.
*/
-case class CoGroup[K, Left, Right, R](
- func: (K, Iterator[Left], Iterator[Right]) => TraversableOnce[R],
- kEncoder: ExpressionEncoder[K],
+case class CoGroup[Key, Left, Right, Result](
+ func: (Key, Iterator[Left], Iterator[Right]) => TraversableOnce[Result],
+ keyEnc: ExpressionEncoder[Key],
leftEnc: ExpressionEncoder[Left],
rightEnc: ExpressionEncoder[Right],
- rEncoder: ExpressionEncoder[R],
+ resultEnc: ExpressionEncoder[Result],
output: Seq[Attribute],
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],