diff options
author | Michael Armbrust <michael@databricks.com> | 2015-02-09 16:02:56 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-09 16:02:56 -0800 |
commit | 68b25cf695e0fce9e465288d5a053e540a3fccb4 (patch) | |
tree | 8896c360d890435af855099d02dae20905a7e90f | |
parent | b884daa58084d4f42e2318894067565b94e07f9d (diff) | |
download | spark-68b25cf695e0fce9e465288d5a053e540a3fccb4.tar.gz spark-68b25cf695e0fce9e465288d5a053e540a3fccb4.tar.bz2 spark-68b25cf695e0fce9e465288d5a053e540a3fccb4.zip |
[SQL] Add some missing DataFrame functions.
- as with a `Symbol`
- distinct
- sqlContext.emptyDataFrame
- move add/remove col out of RDDApi section
Author: Michael Armbrust <michael@databricks.com>
Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits:
2004023 [Michael Armbrust] Add missing functions
6 files changed, 51 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 878b2b0556..1011bf0bb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -550,6 +550,15 @@ trait Column extends DataFrame { override def as(alias: String): Column = exprToColumn(Alias(expr, alias)()) /** + * Gives the column an alias. + * {{{ + * // Renames colA to colB in select output. + * df.select($"colA".as('colB)) + * }}} + */ + override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)()) + + /** * Casts the column to a different data type. * {{{ * // Casts colA to IntegerType. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17ea3cde8e..6abfb7853c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] { def join(right: DataFrame, joinExprs: Column): DataFrame /** - * Join with another [[DataFrame]], usin g the given join expression. The following performs + * Join with another [[DataFrame]], using the given join expression. The following performs * a full outer join between `df1` and `df2`. * * {{{ @@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] { /** * Returns a new [[DataFrame]] with an alias set. */ - def as(name: String): DataFrame + def as(alias: String): DataFrame + + /** + * (Scala-specific) Returns a new [[DataFrame]] with an alias set. + */ + def as(alias: Symbol): DataFrame /** * Selects a set of expressions. @@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] { */ override def repartition(numPartitions: Int): DataFrame + /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */ + override def distinct: DataFrame + override def persist(): this.type override def persist(newLevel: StorageLevel): this.type diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index fa05a5dcac..73393295ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql]( }.toSeq :_*) } - override def as(name: String): DataFrame = Subquery(name, logicalPlan) + override def as(alias: String): DataFrame = Subquery(alias, logicalPlan) + + override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { val exprs = cols.zipWithIndex.map { @@ -215,7 +217,19 @@ private[sql] class DataFrameImpl protected[sql]( override def selectExpr(exprs: String*): DataFrame = { select(exprs.map { expr => Column(new SqlParser().parseExpression(expr)) - } :_*) + }: _*) + } + + override def addColumn(colName: String, col: Column): DataFrame = { + select(Column("*"), col.as(colName)) + } + + override def renameColumn(existingName: String, newName: String): DataFrame = { + val colNames = schema.map { field => + val name = field.name + if (name == existingName) Column(name).as(newName) else Column(name) + } + select(colNames :_*) } override def filter(condition: Column): DataFrame = { @@ -264,18 +278,8 @@ private[sql] class DataFrameImpl protected[sql]( } ///////////////////////////////////////////////////////////////////////////// - - override def addColumn(colName: String, col: Column): DataFrame = { - select(Column("*"), col.as(colName)) - } - - override def renameColumn(existingName: String, newName: String): DataFrame = { - val colNames = schema.map { field => - val name = field.name - if (name == existingName) Column(name).as(newName) else Column(name) - } - select(colNames :_*) - } + // RDD API + ///////////////////////////////////////////////////////////////////////////// override def head(n: Int): Array[Row] = limit(n).collect() @@ -307,6 +311,8 @@ private[sql] class DataFrameImpl protected[sql]( sqlContext.applySchema(rdd.repartition(numPartitions), schema) } + override def distinct: DataFrame = Distinct(logicalPlan) + override def persist(): this.type = { sqlContext.cacheManager.cacheQuery(this) this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 782f6e28ee..0600dcc226 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -86,6 +86,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def selectExpr(exprs: String*): DataFrame = err() + override def addColumn(colName: String, col: Column): DataFrame = err() + + override def renameColumn(existingName: String, newName: String): DataFrame = err() + override def filter(condition: Column): DataFrame = err() override def filter(conditionExpr: String): DataFrame = err() @@ -110,10 +114,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten ///////////////////////////////////////////////////////////////////////////// - override def addColumn(colName: String, col: Column): DataFrame = err() - - override def renameColumn(existingName: String, newName: String): DataFrame = err() - override def head(n: Int): Array[Row] = err() override def head(): Row = err() @@ -140,6 +140,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def repartition(numPartitions: Int): DataFrame = err() + override def distinct: DataFrame = err() + override def persist(): this.type = err() override def persist(newLevel: StorageLevel): this.type = err() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala index 38e6382f17..df866fd1ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -60,4 +60,6 @@ private[sql] trait RDDApi[T] { def first(): T def repartition(numPartitions: Int): DataFrame + + def distinct: DataFrame } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bf39906710..97e3777f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} @@ -130,6 +130,9 @@ class SQLContext(@transient val sparkContext: SparkContext) */ val experimental: ExperimentalMethods = new ExperimentalMethods(this) + /** Returns a [[DataFrame]] with no rows or columns. */ + lazy val emptyDataFrame = DataFrame(this, NoRelation) + /** * A collection of methods for registering user-defined functions (UDF). * |