From ee04413935f74b3178adbb6d8dee19b3320803e9 Mon Sep 17 00:00:00 2001 From: rakeshchalasani Date: Thu, 30 Apr 2015 17:42:50 -0700 Subject: [SPARK-7280][SQL] Add "drop" column/s on a data frame Takes a column name/s and returns a new DataFrame that drops a column/s. Author: rakeshchalasani Closes #5818 from rakeshchalasani/SPARK-7280 and squashes the following commits: ce2ec09 [rakeshchalasani] Minor edit 45c06f1 [rakeshchalasani] Change withColumnRename and format changes f68945a [rakeshchalasani] Minor fix 0b9104d [rakeshchalasani] Drop one column at a time 289afd2 [rakeshchalasani] [SPARK-7280][SQL] Add "drop" column/s on a data frame --- .../scala/org/apache/spark/sql/DataFrame.scala | 33 +++++++++++++++++++--- .../org/apache/spark/sql/DataFrameSuite.scala | 16 +++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 7be2a01da1..c421006c8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -851,15 +851,40 @@ class DataFrame private[sql]( /** * Returns a new [[DataFrame]] with a column renamed. + * This is a no-op if schema doesn't contain existingName. * @group dfops */ def withColumnRenamed(existingName: String, newName: String): DataFrame = { val resolver = sqlContext.analyzer.resolver - val colNames = schema.map { field => - val name = field.name - if (resolver(name, existingName)) Column(name).as(newName) else Column(name) + val shouldRename = schema.exists(f => resolver(f.name, existingName)) + if (shouldRename) { + val colNames = schema.map { field => + val name = field.name + if (resolver(name, existingName)) Column(name).as(newName) else Column(name) + } + select(colNames : _*) + } else { + this + } + } + + /** + * Returns a new [[DataFrame]] with a column dropped. + * This is a no-op if schema doesn't contain column name. + * @group dfops + */ + def drop(colName: String): DataFrame = { + val resolver = sqlContext.analyzer.resolver + val shouldDrop = schema.exists(f => resolver(f.name, colName)) + if (shouldDrop) { + val colsAfterDrop = schema.filter { field => + val name = field.name + !resolver(name, colName) + }.map(f => Column(f.name)) + select(colsAfterDrop : _*) + } else { + this } - select(colNames :_*) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b70e127b4e..e286fef23c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -499,6 +499,22 @@ class DataFrameSuite extends QueryTest { Row(2) :: Row(3) :: Row(4) :: Nil) } + test("drop column using drop") { + val df = testData.drop("key") + checkAnswer( + df, + testData.collect().map(x => Row(x.getString(1))).toSeq) + assert(df.schema.map(_.name) === Seq("value")) + } + + test("drop unknown column (no-op)") { + val df = testData.drop("random") + checkAnswer( + df, + testData.collect().toSeq) + assert(df.schema.map(_.name) === Seq("key","value")) + } + test("withColumnRenamed") { val df = testData.toDF().withColumn("newCol", col("key") + 1) .withColumnRenamed("value", "valueRenamed") -- cgit v1.2.3