aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-04-20 18:54:01 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-20 18:54:01 -0700
commitc736220dac51cf73181fdd7f621c960c4e7bf0c2 (patch)
tree8c5b542b815f7281f53223b9b0373fd500f3c698 /sql
parentce7ddabbcd330b19f6d0c17082304dfa6e1621b2 (diff)
downloadspark-c736220dac51cf73181fdd7f621c960c4e7bf0c2.tar.gz
spark-c736220dac51cf73181fdd7f621c960c4e7bf0c2.tar.bz2
spark-c736220dac51cf73181fdd7f621c960c4e7bf0c2.zip
[SPARK-6635][SQL] DataFrame.withColumn should replace columns with identical column names
JIRA https://issues.apache.org/jira/browse/SPARK-6635 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #5541 from viirya/replace_with_column and squashes the following commits: b539c7b [Liang-Chi Hsieh] For comment. 72f35b1 [Liang-Chi Hsieh] DataFrame.withColumn can replace original column with identical column name.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala8
2 files changed, 21 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17c21f6e3a..45f5da3876 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -747,7 +747,19 @@ class DataFrame private[sql](
* Returns a new [[DataFrame]] by adding a column.
* @group dfops
*/
- def withColumn(colName: String, col: Column): DataFrame = select(Column("*"), col.as(colName))
+ def withColumn(colName: String, col: Column): DataFrame = {
+ val resolver = sqlContext.analyzer.resolver
+ val replaced = schema.exists(f => resolver(f.name, colName))
+ if (replaced) {
+ val colNames = schema.map { field =>
+ val name = field.name
+ if (resolver(name, colName)) col.as(colName) else Column(name)
+ }
+ select(colNames :_*)
+ } else {
+ select(Column("*"), col.as(colName))
+ }
+ }
/**
* Returns a new [[DataFrame]] with a column renamed.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3250ab476a..b9b6a400ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -473,6 +473,14 @@ class DataFrameSuite extends QueryTest {
assert(df.schema.map(_.name).toSeq === Seq("key", "value", "newCol"))
}
+ test("replace column using withColumn") {
+ val df2 = TestSQLContext.sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
+ val df3 = df2.withColumn("x", df2("x") + 1)
+ checkAnswer(
+ df3.select("x"),
+ Row(2) :: Row(3) :: Row(4) :: Nil)
+ }
+
test("withColumnRenamed") {
val df = testData.toDF().withColumn("newCol", col("key") + 1)
.withColumnRenamed("value", "valueRenamed")