diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-10-28 13:58:52 +0100 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-10-28 13:58:52 +0100 |
commit | 075ce4914fdcbbcc7286c3c30cb940ed28d474d2 (patch) | |
tree | f4eaa13efe6d0322649ad1be161e84ba9dd35e7e /sql/catalyst | |
parent | 5f1cee6f158adb1f9f485ed1d529c56bace68adc (diff) | |
download | spark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.tar.gz spark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.tar.bz2 spark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.zip |
[SPARK-11313][SQL] implement cogroup on DataSets (support 2 datasets)
A simpler version of https://github.com/apache/spark/pull/9279, only support 2 datasets.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #9324 from cloud-fan/cogroup2.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java | 1 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 39 |
2 files changed, 40 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 850838af9b..5ba14ebdb6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -591,6 +591,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); build.append(','); } + build.deleteCharAt(build.length() - 1); build.append(']'); return build.toString(); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d2d3db0a44..4cb67aacf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -513,3 +513,42 @@ case class MapGroups[K, T, U]( override def missingInput: AttributeSet = AttributeSet.empty } +/** Factory for constructing new `CoGroup` nodes. */ +object CoGroup { + def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder]( + func: (K, Iterator[Left], Iterator[Right]) => Iterator[R], + leftGroup: Seq[Attribute], + rightGroup: Seq[Attribute], + left: LogicalPlan, + right: LogicalPlan): CoGroup[K, Left, Right, R] = { + CoGroup( + func, + encoderFor[K], + encoderFor[Left], + encoderFor[Right], + encoderFor[R], + encoderFor[R].schema.toAttributes, + leftGroup, + rightGroup, + left, + right) + } +} + +/** + * A relation produced by applying `func` to each grouping key and associated values from left and + * right children. + */ +case class CoGroup[K, Left, Right, R]( + func: (K, Iterator[Left], Iterator[Right]) => Iterator[R], + kEncoder: ExpressionEncoder[K], + leftEnc: ExpressionEncoder[Left], + rightEnc: ExpressionEncoder[Right], + rEncoder: ExpressionEncoder[R], + output: Seq[Attribute], + leftGroup: Seq[Attribute], + rightGroup: Seq[Attribute], + left: LogicalPlan, + right: LogicalPlan) extends BinaryNode { + override def missingInput: AttributeSet = AttributeSet.empty +} |