aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKevin Yu <qyu@us.ibm.com>2016-05-07 11:13:48 +0800
committerWenchen Fan <wenchen@databricks.com>2016-05-07 11:13:48 +0800
commit607a27a0d149be049091bcf274a73b8476b36c90 (patch)
tree5492c0b4584c78caaf19f82d9a56da9139f71166 /sql
parenta21a3bbe6931e162c53a61daff1ef428fb802b8a (diff)
downloadspark-607a27a0d149be049091bcf274a73b8476b36c90.tar.gz
spark-607a27a0d149be049091bcf274a73b8476b36c90.tar.bz2
spark-607a27a0d149be049091bcf274a73b8476b36c90.zip
[SPARK-15051][SQL] Create a TypedColumn alias
## What changes were proposed in this pull request? Currently when we create an alias against a TypedColumn from user-defined Aggregator(for example: agg(aggSum.toColumn as "a")), spark is using the alias' function from Column( as), the alias function will return a column contains a TypedAggregateExpression, which is unresolved because the inputDeserializer is not defined. Later the aggregator function (agg) will inject the inputDeserializer back to the TypedAggregateExpression, but only if the aggregate columns are TypedColumn, in the above case, the TypedAggregateExpression will remain unresolved because it is under column and caused the problem reported by this jira [15051](https://issues.apache.org/jira/browse/SPARK-15051?jql=project%20%3D%20SPARK). This PR propose to create an alias function for TypedColumn, it will return a TypedColumn. It is using the similar code path as Column's alia function. For the spark build in aggregate function, like max, it is working with alias, for example val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") checkAnswer(df1.agg(max("j") as "b"), Row(3) :: Nil) Thanks for comments. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Add test cases in DatasetAggregatorSuite.scala run the sql related queries against this patch. Author: Kevin Yu <qyu@us.ibm.com> Closes #12893 from kevinyu98/spark-15051.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Column.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala8
2 files changed, 21 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index c58addaf90..9b8334d334 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -68,6 +68,18 @@ class TypedColumn[-T, U](
}
new TypedColumn[T, U](newExpr, encoder)
}
+
+ /**
+ * Gives the TypedColumn a name (alias).
+ * If the current TypedColumn has metadata associated with it, this metadata will be propagated
+ * to the new column.
+ *
+ * @group expr_ops
+ * @since 2.0.0
+ */
+ override def name(alias: String): TypedColumn[T, U] =
+ new TypedColumn[T, U](super.name(alias).expr, encoder)
+
}
/**
@@ -910,12 +922,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.3.0
*/
- def as(alias: Symbol): Column = withExpr {
- expr match {
- case ne: NamedExpression => Alias(expr, alias.name)(explicitMetadata = Some(ne.metadata))
- case other => Alias(other, alias.name)()
- }
- }
+ def as(alias: Symbol): Column = name(alias.name)
/**
* Gives the column an alias with metadata.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 6eae3ed7ad..b2a0f3d67e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -232,4 +232,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
"a" -> Seq(1, 2)
)
}
+
+ test("spark-15051 alias of aggregator in DataFrame/Dataset[Row]") {
+ val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
+ checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil)
+
+ val df2 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
+ checkAnswer(df2.agg(RowAgg.toColumn as "b").select("b"), Row(6) :: Nil)
+ }
}