aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-23 22:22:15 -0800
committerReynold Xin <rxin@databricks.com>2015-11-23 22:22:15 -0800
commit8d57524662fad4a0760f3bc924e690c2a110e7f7 (patch)
treef4723119c7eb2f5b2ab0fc75a715761dbda386a0 /sql
parent026ea2eab1f3cde270e8a6391d002915f3e1c6e5 (diff)
downloadspark-8d57524662fad4a0760f3bc924e690c2a110e7f7.tar.gz
spark-8d57524662fad4a0760f3bc924e690c2a110e7f7.tar.bz2
spark-8d57524662fad4a0760f3bc924e690c2a110e7f7.zip
[SPARK-11933][SQL] Rename mapGroup -> mapGroups and flatMapGroup -> flatMapGroups.
Based on feedback from Matei, this is more consistent with mapPartitions in Spark. Also addresses some of the cleanups from a previous commit that renames the type variables. Author: Reynold Xin <rxin@databricks.com> Closes #9919 from rxin/SPARK-11933.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala36
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala12
4 files changed, 31 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index 7f43ce1690..793a86b132 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.expressions.Aggregator
@Experimental
class GroupedDataset[K, V] private[sql](
kEncoder: Encoder[K],
- tEncoder: Encoder[V],
+ vEncoder: Encoder[V],
val queryExecution: QueryExecution,
private val dataAttributes: Seq[Attribute],
private val groupingAttributes: Seq[Attribute]) extends Serializable {
@@ -53,12 +53,12 @@ class GroupedDataset[K, V] private[sql](
// queryexecution.
private implicit val unresolvedKEncoder = encoderFor(kEncoder)
- private implicit val unresolvedTEncoder = encoderFor(tEncoder)
+ private implicit val unresolvedVEncoder = encoderFor(vEncoder)
private val resolvedKEncoder =
unresolvedKEncoder.resolve(groupingAttributes, OuterScopes.outerScopes)
- private val resolvedTEncoder =
- unresolvedTEncoder.resolve(dataAttributes, OuterScopes.outerScopes)
+ private val resolvedVEncoder =
+ unresolvedVEncoder.resolve(dataAttributes, OuterScopes.outerScopes)
private def logicalPlan = queryExecution.analyzed
private def sqlContext = queryExecution.sqlContext
@@ -76,7 +76,7 @@ class GroupedDataset[K, V] private[sql](
def keyAs[L : Encoder]: GroupedDataset[L, V] =
new GroupedDataset(
encoderFor[L],
- unresolvedTEncoder,
+ unresolvedVEncoder,
queryExecution,
dataAttributes,
groupingAttributes)
@@ -110,13 +110,13 @@ class GroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
- def flatMapGroup[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
+ def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
new Dataset[U](
sqlContext,
MapGroups(
f,
resolvedKEncoder,
- resolvedTEncoder,
+ resolvedVEncoder,
groupingAttributes,
logicalPlan))
}
@@ -138,8 +138,8 @@ class GroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
- def flatMapGroup[U](f: FlatMapGroupFunction[K, V, U], encoder: Encoder[U]): Dataset[U] = {
- flatMapGroup((key, data) => f.call(key, data.asJava).asScala)(encoder)
+ def flatMapGroups[U](f: FlatMapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] = {
+ flatMapGroups((key, data) => f.call(key, data.asJava).asScala)(encoder)
}
/**
@@ -158,9 +158,9 @@ class GroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
- def mapGroup[U : Encoder](f: (K, Iterator[V]) => U): Dataset[U] = {
+ def mapGroups[U : Encoder](f: (K, Iterator[V]) => U): Dataset[U] = {
val func = (key: K, it: Iterator[V]) => Iterator(f(key, it))
- flatMapGroup(func)
+ flatMapGroups(func)
}
/**
@@ -179,8 +179,8 @@ class GroupedDataset[K, V] private[sql](
*
* @since 1.6.0
*/
- def mapGroup[U](f: MapGroupFunction[K, V, U], encoder: Encoder[U]): Dataset[U] = {
- mapGroup((key, data) => f.call(key, data.asJava))(encoder)
+ def mapGroups[U](f: MapGroupsFunction[K, V, U], encoder: Encoder[U]): Dataset[U] = {
+ mapGroups((key, data) => f.call(key, data.asJava))(encoder)
}
/**
@@ -192,8 +192,8 @@ class GroupedDataset[K, V] private[sql](
def reduce(f: (V, V) => V): Dataset[(K, V)] = {
val func = (key: K, it: Iterator[V]) => Iterator((key, it.reduce(f)))
- implicit val resultEncoder = ExpressionEncoder.tuple(unresolvedKEncoder, unresolvedTEncoder)
- flatMapGroup(func)
+ implicit val resultEncoder = ExpressionEncoder.tuple(unresolvedKEncoder, unresolvedVEncoder)
+ flatMapGroups(func)
}
/**
@@ -213,7 +213,7 @@ class GroupedDataset[K, V] private[sql](
private def withEncoder(c: Column): Column = c match {
case tc: TypedColumn[_, _] =>
- tc.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes)
+ tc.withInputType(resolvedVEncoder.bind(dataAttributes), dataAttributes)
case _ => c
}
@@ -227,7 +227,7 @@ class GroupedDataset[K, V] private[sql](
val encoders = columns.map(_.encoder)
val namedColumns =
columns.map(
- _.withInputType(resolvedTEncoder, dataAttributes).named)
+ _.withInputType(resolvedVEncoder, dataAttributes).named)
val keyColumn = if (groupingAttributes.length > 1) {
Alias(CreateStruct(groupingAttributes), "key")()
} else {
@@ -304,7 +304,7 @@ class GroupedDataset[K, V] private[sql](
def cogroup[U, R : Encoder](
other: GroupedDataset[K, U])(
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
- implicit def uEnc: Encoder[U] = other.unresolvedTEncoder
+ implicit def uEnc: Encoder[U] = other.unresolvedVEncoder
new Dataset[R](
sqlContext,
CoGroup(
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index cf335efdd2..67a3190cb7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -170,7 +170,7 @@ public class JavaDatasetSuite implements Serializable {
}
}, Encoders.INT());
- Dataset<String> mapped = grouped.mapGroup(new MapGroupFunction<Integer, String, String>() {
+ Dataset<String> mapped = grouped.mapGroups(new MapGroupsFunction<Integer, String, String>() {
@Override
public String call(Integer key, Iterator<String> values) throws Exception {
StringBuilder sb = new StringBuilder(key.toString());
@@ -183,8 +183,8 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(Arrays.asList("1a", "3foobar"), mapped.collectAsList());
- Dataset<String> flatMapped = grouped.flatMapGroup(
- new FlatMapGroupFunction<Integer, String, String>() {
+ Dataset<String> flatMapped = grouped.flatMapGroups(
+ new FlatMapGroupsFunction<Integer, String, String>() {
@Override
public Iterable<String> call(Integer key, Iterator<String> values) throws Exception {
StringBuilder sb = new StringBuilder(key.toString());
@@ -249,8 +249,8 @@ public class JavaDatasetSuite implements Serializable {
GroupedDataset<Integer, String> grouped =
ds.groupBy(length(col("value"))).keyAs(Encoders.INT());
- Dataset<String> mapped = grouped.mapGroup(
- new MapGroupFunction<Integer, String, String>() {
+ Dataset<String> mapped = grouped.mapGroups(
+ new MapGroupsFunction<Integer, String, String>() {
@Override
public String call(Integer key, Iterator<String> data) throws Exception {
StringBuilder sb = new StringBuilder(key.toString());
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
index d387710357..f75d096182 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
@@ -86,7 +86,7 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
test("groupBy function, map") {
val ds = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).toDS()
val grouped = ds.groupBy(_ % 2)
- val agged = grouped.mapGroup { case (g, iter) =>
+ val agged = grouped.mapGroups { case (g, iter) =>
val name = if (g == 0) "even" else "odd"
(name, iter.size)
}
@@ -99,7 +99,7 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
test("groupBy function, flatMap") {
val ds = Seq("a", "b", "c", "xyz", "hello").toDS()
val grouped = ds.groupBy(_.length)
- val agged = grouped.flatMapGroup { case (g, iter) => Iterator(g.toString, iter.mkString) }
+ val agged = grouped.flatMapGroups { case (g, iter) => Iterator(g.toString, iter.mkString) }
checkAnswer(
agged,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index cc8e4325fd..dbdd7ba14a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -224,7 +224,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy function, map") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy(v => (v._1, "word"))
- val agged = grouped.mapGroup { case (g, iter) => (g._1, iter.map(_._2).sum) }
+ val agged = grouped.mapGroups { case (g, iter) => (g._1, iter.map(_._2).sum) }
checkAnswer(
agged,
@@ -234,7 +234,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy function, flatMap") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy(v => (v._1, "word"))
- val agged = grouped.flatMapGroup { case (g, iter) =>
+ val agged = grouped.flatMapGroups { case (g, iter) =>
Iterator(g._1, iter.map(_._2).sum.toString)
}
@@ -255,7 +255,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy columns, map") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1")
- val agged = grouped.mapGroup { case (g, iter) => (g.getString(0), iter.map(_._2).sum) }
+ val agged = grouped.mapGroups { case (g, iter) => (g.getString(0), iter.map(_._2).sum) }
checkAnswer(
agged,
@@ -265,7 +265,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy columns asKey, map") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1").keyAs[String]
- val agged = grouped.mapGroup { case (g, iter) => (g, iter.map(_._2).sum) }
+ val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
checkAnswer(
agged,
@@ -275,7 +275,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy columns asKey tuple, map") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1", lit(1)).keyAs[(String, Int)]
- val agged = grouped.mapGroup { case (g, iter) => (g, iter.map(_._2).sum) }
+ val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
checkAnswer(
agged,
@@ -285,7 +285,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("groupBy columns asKey class, map") {
val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
val grouped = ds.groupBy($"_1".as("a"), lit(1).as("b")).keyAs[ClassData]
- val agged = grouped.mapGroup { case (g, iter) => (g, iter.map(_._2).sum) }
+ val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) }
checkAnswer(
agged,