aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-28 13:58:52 +0100
committerMichael Armbrust <michael@databricks.com>2015-10-28 13:58:52 +0100
commit075ce4914fdcbbcc7286c3c30cb940ed28d474d2 (patch)
treef4eaa13efe6d0322649ad1be161e84ba9dd35e7e /sql/catalyst
parent5f1cee6f158adb1f9f485ed1d529c56bace68adc (diff)
downloadspark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.tar.gz
spark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.tar.bz2
spark-075ce4914fdcbbcc7286c3c30cb940ed28d474d2.zip
[SPARK-11313][SQL] implement cogroup on DataSets (support 2 datasets)
A simpler version of https://github.com/apache/spark/pull/9279, only support 2 datasets. Author: Wenchen Fan <wenchen@databricks.com> Closes #9324 from cloud-fan/cogroup2.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala39
2 files changed, 40 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 850838af9b..5ba14ebdb6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -591,6 +591,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
build.append(',');
}
+ build.deleteCharAt(build.length() - 1);
build.append(']');
return build.toString();
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index d2d3db0a44..4cb67aacf3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -513,3 +513,42 @@ case class MapGroups[K, T, U](
override def missingInput: AttributeSet = AttributeSet.empty
}
+/** Factory for constructing new `CoGroup` nodes. */
+object CoGroup {
+ def apply[K : Encoder, Left : Encoder, Right : Encoder, R : Encoder](
+ func: (K, Iterator[Left], Iterator[Right]) => Iterator[R],
+ leftGroup: Seq[Attribute],
+ rightGroup: Seq[Attribute],
+ left: LogicalPlan,
+ right: LogicalPlan): CoGroup[K, Left, Right, R] = {
+ CoGroup(
+ func,
+ encoderFor[K],
+ encoderFor[Left],
+ encoderFor[Right],
+ encoderFor[R],
+ encoderFor[R].schema.toAttributes,
+ leftGroup,
+ rightGroup,
+ left,
+ right)
+ }
+}
+
+/**
+ * A relation produced by applying `func` to each grouping key and associated values from left and
+ * right children.
+ */
+case class CoGroup[K, Left, Right, R](
+ func: (K, Iterator[Left], Iterator[Right]) => Iterator[R],
+ kEncoder: ExpressionEncoder[K],
+ leftEnc: ExpressionEncoder[Left],
+ rightEnc: ExpressionEncoder[Right],
+ rEncoder: ExpressionEncoder[R],
+ output: Seq[Attribute],
+ leftGroup: Seq[Attribute],
+ rightGroup: Seq[Attribute],
+ left: LogicalPlan,
+ right: LogicalPlan) extends BinaryNode {
+ override def missingInput: AttributeSet = AttributeSet.empty
+}