aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-09 16:02:56 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-09 16:02:56 -0800
commit68b25cf695e0fce9e465288d5a053e540a3fccb4 (patch)
tree8896c360d890435af855099d02dae20905a7e90f
parentb884daa58084d4f42e2318894067565b94e07f9d (diff)
downloadspark-68b25cf695e0fce9e465288d5a053e540a3fccb4.tar.gz
spark-68b25cf695e0fce9e465288d5a053e540a3fccb4.tar.bz2
spark-68b25cf695e0fce9e465288d5a053e540a3fccb4.zip
[SQL] Add some missing DataFrame functions.
- as with a `Symbol` - distinct - sqlContext.emptyDataFrame - move add/remove col out of RDDApi section Author: Michael Armbrust <michael@databricks.com> Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits: 2004023 [Michael Armbrust] Add missing functions
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Column.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala5
6 files changed, 51 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 878b2b0556..1011bf0bb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -550,6 +550,15 @@ trait Column extends DataFrame {
override def as(alias: String): Column = exprToColumn(Alias(expr, alias)())
/**
+ * Gives the column an alias.
+ * {{{
+ * // Renames colA to colB in select output.
+ * df.select($"colA".as('colB))
+ * }}}
+ */
+ override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)())
+
+ /**
* Casts the column to a different data type.
* {{{
* // Casts colA to IntegerType.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17ea3cde8e..6abfb7853c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] {
def join(right: DataFrame, joinExprs: Column): DataFrame
/**
- * Join with another [[DataFrame]], usin g the given join expression. The following performs
+ * Join with another [[DataFrame]], using the given join expression. The following performs
* a full outer join between `df1` and `df2`.
*
* {{{
@@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] {
/**
* Returns a new [[DataFrame]] with an alias set.
*/
- def as(name: String): DataFrame
+ def as(alias: String): DataFrame
+
+ /**
+ * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+ */
+ def as(alias: Symbol): DataFrame
/**
* Selects a set of expressions.
@@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] {
*/
override def repartition(numPartitions: Int): DataFrame
+ /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+ override def distinct: DataFrame
+
override def persist(): this.type
override def persist(newLevel: StorageLevel): this.type
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index fa05a5dcac..73393295ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql](
}.toSeq :_*)
}
- override def as(name: String): DataFrame = Subquery(name, logicalPlan)
+ override def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+
+ override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
override def select(cols: Column*): DataFrame = {
val exprs = cols.zipWithIndex.map {
@@ -215,7 +217,19 @@ private[sql] class DataFrameImpl protected[sql](
override def selectExpr(exprs: String*): DataFrame = {
select(exprs.map { expr =>
Column(new SqlParser().parseExpression(expr))
- } :_*)
+ }: _*)
+ }
+
+ override def addColumn(colName: String, col: Column): DataFrame = {
+ select(Column("*"), col.as(colName))
+ }
+
+ override def renameColumn(existingName: String, newName: String): DataFrame = {
+ val colNames = schema.map { field =>
+ val name = field.name
+ if (name == existingName) Column(name).as(newName) else Column(name)
+ }
+ select(colNames :_*)
}
override def filter(condition: Column): DataFrame = {
@@ -264,18 +278,8 @@ private[sql] class DataFrameImpl protected[sql](
}
/////////////////////////////////////////////////////////////////////////////
-
- override def addColumn(colName: String, col: Column): DataFrame = {
- select(Column("*"), col.as(colName))
- }
-
- override def renameColumn(existingName: String, newName: String): DataFrame = {
- val colNames = schema.map { field =>
- val name = field.name
- if (name == existingName) Column(name).as(newName) else Column(name)
- }
- select(colNames :_*)
- }
+ // RDD API
+ /////////////////////////////////////////////////////////////////////////////
override def head(n: Int): Array[Row] = limit(n).collect()
@@ -307,6 +311,8 @@ private[sql] class DataFrameImpl protected[sql](
sqlContext.applySchema(rdd.repartition(numPartitions), schema)
}
+ override def distinct: DataFrame = Distinct(logicalPlan)
+
override def persist(): this.type = {
sqlContext.cacheManager.cacheQuery(this)
this
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 782f6e28ee..0600dcc226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -86,6 +86,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def selectExpr(exprs: String*): DataFrame = err()
+ override def addColumn(colName: String, col: Column): DataFrame = err()
+
+ override def renameColumn(existingName: String, newName: String): DataFrame = err()
+
override def filter(condition: Column): DataFrame = err()
override def filter(conditionExpr: String): DataFrame = err()
@@ -110,10 +114,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
/////////////////////////////////////////////////////////////////////////////
- override def addColumn(colName: String, col: Column): DataFrame = err()
-
- override def renameColumn(existingName: String, newName: String): DataFrame = err()
-
override def head(n: Int): Array[Row] = err()
override def head(): Row = err()
@@ -140,6 +140,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def repartition(numPartitions: Int): DataFrame = err()
+ override def distinct: DataFrame = err()
+
override def persist(): this.type = err()
override def persist(newLevel: StorageLevel): this.type = err()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index 38e6382f17..df866fd1ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -60,4 +60,6 @@ private[sql] trait RDDApi[T] {
def first(): T
def repartition(numPartitions: Int): DataFrame
+
+ def distinct: DataFrame
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bf39906710..97e3777f93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -130,6 +130,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
val experimental: ExperimentalMethods = new ExperimentalMethods(this)
+ /** Returns a [[DataFrame]] with no rows or columns. */
+ lazy val emptyDataFrame = DataFrame(this, NoRelation)
+
/**
* A collection of methods for registering user-defined functions (UDF).
*